1use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32 ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44 PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45 ParquetStatisticsPolicy, RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51pub use read_plan::{ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60pub struct ArrowReaderBuilder<T> {
108 pub(crate) input: T,
116
117 pub(crate) metadata: Arc<ParquetMetaData>,
118
119 pub(crate) schema: SchemaRef,
120
121 pub(crate) fields: Option<Arc<ParquetField>>,
122
123 pub(crate) batch_size: usize,
124
125 pub(crate) row_groups: Option<Vec<usize>>,
126
127 pub(crate) projection: ProjectionMask,
128
129 pub(crate) filter: Option<RowFilter>,
130
131 pub(crate) selection: Option<RowSelection>,
132
133 pub(crate) row_selection_policy: RowSelectionPolicy,
134
135 pub(crate) limit: Option<usize>,
136
137 pub(crate) offset: Option<usize>,
138
139 pub(crate) metrics: ArrowReaderMetrics,
140
141 pub(crate) max_predicate_cache_size: usize,
142}
143
144impl<T: Debug> Debug for ArrowReaderBuilder<T> {
145 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct("ArrowReaderBuilder<T>")
147 .field("input", &self.input)
148 .field("metadata", &self.metadata)
149 .field("schema", &self.schema)
150 .field("fields", &self.fields)
151 .field("batch_size", &self.batch_size)
152 .field("row_groups", &self.row_groups)
153 .field("projection", &self.projection)
154 .field("filter", &self.filter)
155 .field("selection", &self.selection)
156 .field("row_selection_policy", &self.row_selection_policy)
157 .field("limit", &self.limit)
158 .field("offset", &self.offset)
159 .field("metrics", &self.metrics)
160 .finish()
161 }
162}
163
164impl<T> ArrowReaderBuilder<T> {
165 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
166 Self {
167 input,
168 metadata: metadata.metadata,
169 schema: metadata.schema,
170 fields: metadata.fields,
171 batch_size: 1024,
172 row_groups: None,
173 projection: ProjectionMask::all(),
174 filter: None,
175 selection: None,
176 row_selection_policy: RowSelectionPolicy::default(),
177 limit: None,
178 offset: None,
179 metrics: ArrowReaderMetrics::Disabled,
180 max_predicate_cache_size: 100 * 1024 * 1024, }
182 }
183
184 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
186 &self.metadata
187 }
188
189 pub fn parquet_schema(&self) -> &SchemaDescriptor {
191 self.metadata.file_metadata().schema_descr()
192 }
193
194 pub fn schema(&self) -> &SchemaRef {
196 &self.schema
197 }
198
199 pub fn with_batch_size(self, batch_size: usize) -> Self {
202 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
204 Self { batch_size, ..self }
205 }
206
207 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
211 Self {
212 row_groups: Some(row_groups),
213 ..self
214 }
215 }
216
217 pub fn with_projection(self, mask: ProjectionMask) -> Self {
219 Self {
220 projection: mask,
221 ..self
222 }
223 }
224
225 pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
229 Self {
230 row_selection_policy: policy,
231 ..self
232 }
233 }
234
235 pub fn with_row_selection(self, selection: RowSelection) -> Self {
295 Self {
296 selection: Some(selection),
297 ..self
298 }
299 }
300
301 pub fn with_row_filter(self, filter: RowFilter) -> Self {
341 Self {
342 filter: Some(filter),
343 ..self
344 }
345 }
346
347 pub fn with_limit(self, limit: usize) -> Self {
355 Self {
356 limit: Some(limit),
357 ..self
358 }
359 }
360
361 pub fn with_offset(self, offset: usize) -> Self {
369 Self {
370 offset: Some(offset),
371 ..self
372 }
373 }
374
375 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
410 Self { metrics, ..self }
411 }
412
413 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
428 Self {
429 max_predicate_cache_size,
430 ..self
431 }
432 }
433}
434
435#[derive(Debug, Clone, Default)]
450pub struct ArrowReaderOptions {
451 skip_arrow_metadata: bool,
453 supplied_schema: Option<SchemaRef>,
458
459 pub(crate) column_index: PageIndexPolicy,
460 pub(crate) offset_index: PageIndexPolicy,
461
462 metadata_options: ParquetMetaDataOptions,
464 #[cfg(feature = "encryption")]
466 pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
467
468 virtual_columns: Vec<FieldRef>,
469}
470
471impl ArrowReaderOptions {
472 pub fn new() -> Self {
474 Self::default()
475 }
476
477 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
484 Self {
485 skip_arrow_metadata,
486 ..self
487 }
488 }
489
490 pub fn with_schema(self, schema: SchemaRef) -> Self {
599 Self {
600 supplied_schema: Some(schema),
601 skip_arrow_metadata: true,
602 ..self
603 }
604 }
605
606 #[deprecated(since = "57.2.0", note = "Use `with_page_index_policy` instead")]
607 pub fn with_page_index(self, page_index: bool) -> Self {
620 self.with_page_index_policy(PageIndexPolicy::from(page_index))
621 }
622
623 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
631 self.with_column_index_policy(policy)
632 .with_offset_index_policy(policy)
633 }
634
635 pub fn with_column_index_policy(mut self, policy: PageIndexPolicy) -> Self {
642 self.column_index = policy;
643 self
644 }
645
646 pub fn with_offset_index_policy(mut self, policy: PageIndexPolicy) -> Self {
653 self.offset_index = policy;
654 self
655 }
656
657 pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
663 self.metadata_options.set_schema(schema);
664 self
665 }
666
667 pub fn with_encoding_stats_as_mask(mut self, val: bool) -> Self {
678 self.metadata_options.set_encoding_stats_as_mask(val);
679 self
680 }
681
682 pub fn with_encoding_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
687 self.metadata_options.set_encoding_stats_policy(policy);
688 self
689 }
690
691 pub fn with_column_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
696 self.metadata_options.set_column_stats_policy(policy);
697 self
698 }
699
700 pub fn with_size_stats_policy(mut self, policy: ParquetStatisticsPolicy) -> Self {
705 self.metadata_options.set_size_stats_policy(policy);
706 self
707 }
708
709 #[cfg(feature = "encryption")]
713 pub fn with_file_decryption_properties(
714 self,
715 file_decryption_properties: Arc<FileDecryptionProperties>,
716 ) -> Self {
717 Self {
718 file_decryption_properties: Some(file_decryption_properties),
719 ..self
720 }
721 }
722
723 pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
776 for field in &virtual_columns {
778 if !is_virtual_column(field) {
779 return Err(ParquetError::General(format!(
780 "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
781 field.name()
782 )));
783 }
784 }
785 Ok(Self {
786 virtual_columns,
787 ..self
788 })
789 }
790
791 #[deprecated(
792 since = "57.2.0",
793 note = "Use `column_index_policy` or `offset_index_policy` instead"
794 )]
795 pub fn page_index(&self) -> bool {
802 self.offset_index != PageIndexPolicy::Skip && self.column_index != PageIndexPolicy::Skip
803 }
804
805 pub fn offset_index_policy(&self) -> PageIndexPolicy {
810 self.offset_index
811 }
812
813 pub fn column_index_policy(&self) -> PageIndexPolicy {
818 self.column_index
819 }
820
821 pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
823 &self.metadata_options
824 }
825
826 #[cfg(feature = "encryption")]
831 pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
832 self.file_decryption_properties.as_ref()
833 }
834}
835
836#[derive(Debug, Clone)]
851pub struct ArrowReaderMetadata {
852 pub(crate) metadata: Arc<ParquetMetaData>,
854 pub(crate) schema: SchemaRef,
856 pub(crate) fields: Option<Arc<ParquetField>>,
858}
859
860impl ArrowReaderMetadata {
861 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
873 let metadata = ParquetMetaDataReader::new()
874 .with_column_index_policy(options.column_index)
875 .with_offset_index_policy(options.offset_index)
876 .with_metadata_options(Some(options.metadata_options.clone()));
877 #[cfg(feature = "encryption")]
878 let metadata = metadata.with_decryption_properties(
879 options.file_decryption_properties.as_ref().map(Arc::clone),
880 );
881 let metadata = metadata.parse_and_finish(reader)?;
882 Self::try_new(Arc::new(metadata), options)
883 }
884
885 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
893 match options.supplied_schema {
894 Some(supplied_schema) => Self::with_supplied_schema(
895 metadata,
896 supplied_schema.clone(),
897 &options.virtual_columns,
898 ),
899 None => {
900 let kv_metadata = match options.skip_arrow_metadata {
901 true => None,
902 false => metadata.file_metadata().key_value_metadata(),
903 };
904
905 let (schema, fields) = parquet_to_arrow_schema_and_fields(
906 metadata.file_metadata().schema_descr(),
907 ProjectionMask::all(),
908 kv_metadata,
909 &options.virtual_columns,
910 )?;
911
912 Ok(Self {
913 metadata,
914 schema: Arc::new(schema),
915 fields: fields.map(Arc::new),
916 })
917 }
918 }
919 }
920
921 fn with_supplied_schema(
922 metadata: Arc<ParquetMetaData>,
923 supplied_schema: SchemaRef,
924 virtual_columns: &[FieldRef],
925 ) -> Result<Self> {
926 let parquet_schema = metadata.file_metadata().schema_descr();
927 let field_levels = parquet_to_arrow_field_levels_with_virtual(
928 parquet_schema,
929 ProjectionMask::all(),
930 Some(supplied_schema.fields()),
931 virtual_columns,
932 )?;
933 let fields = field_levels.fields;
934 let inferred_len = fields.len();
935 let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
936 if inferred_len != supplied_len {
940 return Err(arrow_err!(format!(
941 "Incompatible supplied Arrow schema: expected {} columns received {}",
942 inferred_len, supplied_len
943 )));
944 }
945
946 let mut errors = Vec::new();
947
948 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
949
950 for (field1, field2) in field_iter {
951 if field1.data_type() != field2.data_type() {
952 errors.push(format!(
953 "data type mismatch for field {}: requested {} but found {}",
954 field1.name(),
955 field1.data_type(),
956 field2.data_type()
957 ));
958 }
959 if field1.is_nullable() != field2.is_nullable() {
960 errors.push(format!(
961 "nullability mismatch for field {}: expected {:?} but found {:?}",
962 field1.name(),
963 field1.is_nullable(),
964 field2.is_nullable()
965 ));
966 }
967 if field1.metadata() != field2.metadata() {
968 errors.push(format!(
969 "metadata mismatch for field {}: expected {:?} but found {:?}",
970 field1.name(),
971 field1.metadata(),
972 field2.metadata()
973 ));
974 }
975 }
976
977 if !errors.is_empty() {
978 let message = errors.join(", ");
979 return Err(ParquetError::ArrowError(format!(
980 "Incompatible supplied Arrow schema: {message}",
981 )));
982 }
983
984 Ok(Self {
985 metadata,
986 schema: supplied_schema,
987 fields: field_levels.levels.map(Arc::new),
988 })
989 }
990
991 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
993 &self.metadata
994 }
995
996 pub fn parquet_schema(&self) -> &SchemaDescriptor {
998 self.metadata.file_metadata().schema_descr()
999 }
1000
1001 pub fn schema(&self) -> &SchemaRef {
1003 &self.schema
1004 }
1005}
1006
1007#[doc(hidden)]
1008pub struct SyncReader<T: ChunkReader>(T);
1010
1011impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
1012 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1013 f.debug_tuple("SyncReader").field(&self.0).finish()
1014 }
1015}
1016
1017pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
1024
1025impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
1026 pub fn try_new(reader: T) -> Result<Self> {
1057 Self::try_new_with_options(reader, Default::default())
1058 }
1059
1060 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
1065 let metadata = ArrowReaderMetadata::load(&reader, options)?;
1066 Ok(Self::new_with_metadata(reader, metadata))
1067 }
1068
1069 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
1108 Self::new_builder(SyncReader(input), metadata)
1109 }
1110
1111 pub fn get_row_group_column_bloom_filter(
1117 &self,
1118 row_group_idx: usize,
1119 column_idx: usize,
1120 ) -> Result<Option<Sbbf>> {
1121 let metadata = self.metadata.row_group(row_group_idx);
1122 let column_metadata = metadata.column(column_idx);
1123
1124 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
1125 offset
1126 .try_into()
1127 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
1128 } else {
1129 return Ok(None);
1130 };
1131
1132 let buffer = match column_metadata.bloom_filter_length() {
1133 Some(length) => self.input.0.get_bytes(offset, length as usize),
1134 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
1135 }?;
1136
1137 let (header, bitset_offset) =
1138 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
1139
1140 match header.algorithm {
1141 BloomFilterAlgorithm::BLOCK => {
1142 }
1144 }
1145 match header.compression {
1146 BloomFilterCompression::UNCOMPRESSED => {
1147 }
1149 }
1150 match header.hash {
1151 BloomFilterHash::XXHASH => {
1152 }
1154 }
1155
1156 let bitset = match column_metadata.bloom_filter_length() {
1157 Some(_) => buffer.slice(
1158 (TryInto::<usize>::try_into(bitset_offset).unwrap()
1159 - TryInto::<usize>::try_into(offset).unwrap())..,
1160 ),
1161 None => {
1162 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
1163 ParquetError::General("Bloom filter length is invalid".to_string())
1164 })?;
1165 self.input.0.get_bytes(bitset_offset, bitset_length)?
1166 }
1167 };
1168 Ok(Some(Sbbf::new(&bitset)))
1169 }
1170
1171 pub fn build(self) -> Result<ParquetRecordBatchReader> {
1175 let Self {
1176 input,
1177 metadata,
1178 schema: _,
1179 fields,
1180 batch_size,
1181 row_groups,
1182 projection,
1183 mut filter,
1184 selection,
1185 row_selection_policy,
1186 limit,
1187 offset,
1188 metrics,
1189 max_predicate_cache_size: _,
1191 } = self;
1192
1193 let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1195
1196 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1197
1198 let reader = ReaderRowGroups {
1199 reader: Arc::new(input.0),
1200 metadata,
1201 row_groups,
1202 };
1203
1204 let mut plan_builder = ReadPlanBuilder::new(batch_size)
1205 .with_selection(selection)
1206 .with_row_selection_policy(row_selection_policy);
1207
1208 if let Some(filter) = filter.as_mut() {
1210 for predicate in filter.predicates.iter_mut() {
1211 if !plan_builder.selects_any() {
1213 break;
1214 }
1215
1216 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1217 .with_parquet_metadata(&reader.metadata)
1218 .build_array_reader(fields.as_deref(), predicate.projection())?;
1219
1220 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1221 }
1222 }
1223
1224 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1225 .with_parquet_metadata(&reader.metadata)
1226 .build_array_reader(fields.as_deref(), &projection)?;
1227
1228 let read_plan = plan_builder
1229 .limited(reader.num_rows())
1230 .with_offset(offset)
1231 .with_limit(limit)
1232 .build_limited()
1233 .build();
1234
1235 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1236 }
1237}
1238
1239struct ReaderRowGroups<T: ChunkReader> {
1240 reader: Arc<T>,
1241
1242 metadata: Arc<ParquetMetaData>,
1243 row_groups: Vec<usize>,
1245}
1246
1247impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1248 fn num_rows(&self) -> usize {
1249 let meta = self.metadata.row_groups();
1250 self.row_groups
1251 .iter()
1252 .map(|x| meta[*x].num_rows() as usize)
1253 .sum()
1254 }
1255
1256 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1257 Ok(Box::new(ReaderPageIterator {
1258 column_idx: i,
1259 reader: self.reader.clone(),
1260 metadata: self.metadata.clone(),
1261 row_groups: self.row_groups.clone().into_iter(),
1262 }))
1263 }
1264
1265 fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1266 Box::new(
1267 self.row_groups
1268 .iter()
1269 .map(move |i| self.metadata.row_group(*i)),
1270 )
1271 }
1272
1273 fn metadata(&self) -> &ParquetMetaData {
1274 self.metadata.as_ref()
1275 }
1276}
1277
1278struct ReaderPageIterator<T: ChunkReader> {
1279 reader: Arc<T>,
1280 column_idx: usize,
1281 row_groups: std::vec::IntoIter<usize>,
1282 metadata: Arc<ParquetMetaData>,
1283}
1284
1285impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1286 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1288 let rg = self.metadata.row_group(rg_idx);
1289 let column_chunk_metadata = rg.column(self.column_idx);
1290 let offset_index = self.metadata.offset_index();
1291 let page_locations = offset_index
1294 .filter(|i| !i[rg_idx].is_empty())
1295 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1296 let total_rows = rg.num_rows() as usize;
1297 let reader = self.reader.clone();
1298
1299 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1300 .add_crypto_context(
1301 rg_idx,
1302 self.column_idx,
1303 self.metadata.as_ref(),
1304 column_chunk_metadata,
1305 )
1306 }
1307}
1308
1309impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1310 type Item = Result<Box<dyn PageReader>>;
1311
1312 fn next(&mut self) -> Option<Self::Item> {
1313 let rg_idx = self.row_groups.next()?;
1314 let page_reader = self
1315 .next_page_reader(rg_idx)
1316 .map(|page_reader| Box::new(page_reader) as _);
1317 Some(page_reader)
1318 }
1319}
1320
1321impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1322
1323pub struct ParquetRecordBatchReader {
1334 array_reader: Box<dyn ArrayReader>,
1335 schema: SchemaRef,
1336 read_plan: ReadPlan,
1337}
1338
1339impl Debug for ParquetRecordBatchReader {
1340 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1341 f.debug_struct("ParquetRecordBatchReader")
1342 .field("array_reader", &"...")
1343 .field("schema", &self.schema)
1344 .field("read_plan", &self.read_plan)
1345 .finish()
1346 }
1347}
1348
1349impl Iterator for ParquetRecordBatchReader {
1350 type Item = Result<RecordBatch, ArrowError>;
1351
1352 fn next(&mut self) -> Option<Self::Item> {
1353 self.next_inner()
1354 .map_err(|arrow_err| arrow_err.into())
1355 .transpose()
1356 }
1357}
1358
1359impl ParquetRecordBatchReader {
1360 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1366 let mut read_records = 0;
1367 let batch_size = self.batch_size();
1368 if batch_size == 0 {
1369 return Ok(None);
1370 }
1371 match self.read_plan.row_selection_cursor_mut() {
1372 RowSelectionCursor::Mask(mask_cursor) => {
1373 while !mask_cursor.is_empty() {
1376 let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1377 return Ok(None);
1378 };
1379
1380 if mask_chunk.initial_skip > 0 {
1381 let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1382 if skipped != mask_chunk.initial_skip {
1383 return Err(general_err!(
1384 "failed to skip rows, expected {}, got {}",
1385 mask_chunk.initial_skip,
1386 skipped
1387 ));
1388 }
1389 }
1390
1391 if mask_chunk.chunk_rows == 0 {
1392 if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1393 return Ok(None);
1394 }
1395 continue;
1396 }
1397
1398 let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1399
1400 let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1401 if read == 0 {
1402 return Err(general_err!(
1403 "reached end of column while expecting {} rows",
1404 mask_chunk.chunk_rows
1405 ));
1406 }
1407 if read != mask_chunk.chunk_rows {
1408 return Err(general_err!(
1409 "insufficient rows read from array reader - expected {}, got {}",
1410 mask_chunk.chunk_rows,
1411 read
1412 ));
1413 }
1414
1415 let array = self.array_reader.consume_batch()?;
1416 let struct_array = array.as_struct_opt().ok_or_else(|| {
1419 ArrowError::ParquetError(
1420 "Struct array reader should return struct array".to_string(),
1421 )
1422 })?;
1423
1424 let filtered_batch =
1425 filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1426
1427 if filtered_batch.num_rows() != mask_chunk.selected_rows {
1428 return Err(general_err!(
1429 "filtered rows mismatch selection - expected {}, got {}",
1430 mask_chunk.selected_rows,
1431 filtered_batch.num_rows()
1432 ));
1433 }
1434
1435 if filtered_batch.num_rows() == 0 {
1436 continue;
1437 }
1438
1439 return Ok(Some(filtered_batch));
1440 }
1441 }
1442 RowSelectionCursor::Selectors(selectors_cursor) => {
1443 while read_records < batch_size && !selectors_cursor.is_empty() {
1444 let front = selectors_cursor.next_selector();
1445 if front.skip {
1446 let skipped = self.array_reader.skip_records(front.row_count)?;
1447
1448 if skipped != front.row_count {
1449 return Err(general_err!(
1450 "failed to skip rows, expected {}, got {}",
1451 front.row_count,
1452 skipped
1453 ));
1454 }
1455 continue;
1456 }
1457
1458 if front.row_count == 0 {
1461 continue;
1462 }
1463
1464 let need_read = batch_size - read_records;
1466 let to_read = match front.row_count.checked_sub(need_read) {
1467 Some(remaining) if remaining != 0 => {
1468 selectors_cursor.return_selector(RowSelector::select(remaining));
1471 need_read
1472 }
1473 _ => front.row_count,
1474 };
1475 match self.array_reader.read_records(to_read)? {
1476 0 => break,
1477 rec => read_records += rec,
1478 };
1479 }
1480 }
1481 RowSelectionCursor::All => {
1482 self.array_reader.read_records(batch_size)?;
1483 }
1484 };
1485
1486 let array = self.array_reader.consume_batch()?;
1487 let struct_array = array.as_struct_opt().ok_or_else(|| {
1488 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1489 })?;
1490
1491 Ok(if struct_array.len() > 0 {
1492 Some(RecordBatch::from(struct_array))
1493 } else {
1494 None
1495 })
1496 }
1497}
1498
1499impl RecordBatchReader for ParquetRecordBatchReader {
1500 fn schema(&self) -> SchemaRef {
1505 self.schema.clone()
1506 }
1507}
1508
1509impl ParquetRecordBatchReader {
1510 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1514 ParquetRecordBatchReaderBuilder::try_new(reader)?
1515 .with_batch_size(batch_size)
1516 .build()
1517 }
1518
1519 pub fn try_new_with_row_groups(
1524 levels: &FieldLevels,
1525 row_groups: &dyn RowGroups,
1526 batch_size: usize,
1527 selection: Option<RowSelection>,
1528 ) -> Result<Self> {
1529 let metrics = ArrowReaderMetrics::disabled();
1531 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1532 .with_parquet_metadata(row_groups.metadata())
1533 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1534
1535 let read_plan = ReadPlanBuilder::new(batch_size)
1536 .with_selection(selection)
1537 .build();
1538
1539 Ok(Self {
1540 array_reader,
1541 schema: Arc::new(Schema::new(levels.fields.clone())),
1542 read_plan,
1543 })
1544 }
1545
1546 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1550 let schema = match array_reader.get_data_type() {
1551 ArrowType::Struct(fields) => Schema::new(fields.clone()),
1552 _ => unreachable!("Struct array reader's data type is not struct!"),
1553 };
1554
1555 Self {
1556 array_reader,
1557 schema: Arc::new(schema),
1558 read_plan,
1559 }
1560 }
1561
1562 #[inline(always)]
1563 pub(crate) fn batch_size(&self) -> usize {
1564 self.read_plan.batch_size()
1565 }
1566}
1567
1568#[cfg(test)]
1569pub(crate) mod tests {
1570 use std::cmp::min;
1571 use std::collections::{HashMap, VecDeque};
1572 use std::fmt::Formatter;
1573 use std::fs::File;
1574 use std::io::Seek;
1575 use std::path::PathBuf;
1576 use std::sync::Arc;
1577
1578 use rand::rngs::StdRng;
1579 use rand::{Rng, RngCore, SeedableRng, random, rng};
1580 use tempfile::tempfile;
1581
1582 use crate::arrow::arrow_reader::{
1583 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
1584 ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection,
1585 RowSelector,
1586 };
1587 use crate::arrow::schema::{
1588 add_encoded_arrow_schema_to_metadata,
1589 virtual_type::{RowGroupIndex, RowNumber},
1590 };
1591 use crate::arrow::{ArrowWriter, ProjectionMask};
1592 use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1593 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1594 use crate::data_type::{
1595 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1596 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1597 };
1598 use crate::errors::Result;
1599 use crate::file::metadata::{PageIndexPolicy, ParquetMetaData, ParquetStatisticsPolicy};
1600 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1601 use crate::file::writer::SerializedFileWriter;
1602 use crate::schema::parser::parse_message_type;
1603 use crate::schema::types::{Type, TypePtr};
1604 use crate::util::test_common::rand_gen::RandGen;
1605 use arrow_array::builder::*;
1606 use arrow_array::cast::AsArray;
1607 use arrow_array::types::{
1608 Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1609 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1610 Time64MicrosecondType,
1611 };
1612 use arrow_array::*;
1613 use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1614 use arrow_data::{ArrayData, ArrayDataBuilder};
1615 use arrow_schema::{
1616 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1617 };
1618 use arrow_select::concat::concat_batches;
1619 use bytes::Bytes;
1620 use half::f16;
1621 use num_traits::PrimInt;
1622
1623 #[test]
1624 fn test_arrow_reader_all_columns() {
1625 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1626
1627 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1628 let original_schema = Arc::clone(builder.schema());
1629 let reader = builder.build().unwrap();
1630
1631 assert_eq!(original_schema.fields(), reader.schema().fields());
1633 }
1634
1635 #[test]
1636 fn test_reuse_schema() {
1637 let file = get_test_file("parquet/alltypes-java.parquet");
1638
1639 let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1640 let expected = builder.metadata;
1641 let schema = expected.file_metadata().schema_descr_ptr();
1642
1643 let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1644 let builder =
1645 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1646
1647 assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1649 }
1650
1651 #[test]
1652 fn test_page_encoding_stats_mask() {
1653 let testdata = arrow::util::test_util::parquet_test_data();
1654 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1655 let file = File::open(path).unwrap();
1656
1657 let arrow_options = ArrowReaderOptions::new().with_encoding_stats_as_mask(true);
1658 let builder =
1659 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1660
1661 let row_group_metadata = builder.metadata.row_group(0);
1662
1663 let page_encoding_stats = row_group_metadata
1665 .column(0)
1666 .page_encoding_stats_mask()
1667 .unwrap();
1668 assert!(page_encoding_stats.is_only(Encoding::PLAIN));
1669 let page_encoding_stats = row_group_metadata
1670 .column(2)
1671 .page_encoding_stats_mask()
1672 .unwrap();
1673 assert!(page_encoding_stats.is_only(Encoding::PLAIN_DICTIONARY));
1674 }
1675
1676 #[test]
1677 fn test_stats_stats_skipped() {
1678 let testdata = arrow::util::test_util::parquet_test_data();
1679 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
1680 let file = File::open(path).unwrap();
1681
1682 let arrow_options = ArrowReaderOptions::new()
1684 .with_encoding_stats_policy(ParquetStatisticsPolicy::SkipAll)
1685 .with_column_stats_policy(ParquetStatisticsPolicy::SkipAll);
1686 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1687 file.try_clone().unwrap(),
1688 arrow_options,
1689 )
1690 .unwrap();
1691
1692 let row_group_metadata = builder.metadata.row_group(0);
1693 for column in row_group_metadata.columns() {
1694 assert!(column.page_encoding_stats().is_none());
1695 assert!(column.page_encoding_stats_mask().is_none());
1696 assert!(column.statistics().is_none());
1697 }
1698
1699 let arrow_options = ArrowReaderOptions::new()
1701 .with_encoding_stats_as_mask(true)
1702 .with_encoding_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]))
1703 .with_column_stats_policy(ParquetStatisticsPolicy::skip_except(&[0]));
1704 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1705 file.try_clone().unwrap(),
1706 arrow_options,
1707 )
1708 .unwrap();
1709
1710 let row_group_metadata = builder.metadata.row_group(0);
1711 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1712 assert!(column.page_encoding_stats().is_none());
1713 assert_eq!(column.page_encoding_stats_mask().is_some(), idx == 0);
1714 assert_eq!(column.statistics().is_some(), idx == 0);
1715 }
1716 }
1717
1718 #[test]
1719 fn test_size_stats_stats_skipped() {
1720 let testdata = arrow::util::test_util::parquet_test_data();
1721 let path = format!("{testdata}/repeated_primitive_no_list.parquet");
1722 let file = File::open(path).unwrap();
1723
1724 let arrow_options =
1726 ArrowReaderOptions::new().with_size_stats_policy(ParquetStatisticsPolicy::SkipAll);
1727 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1728 file.try_clone().unwrap(),
1729 arrow_options,
1730 )
1731 .unwrap();
1732
1733 let row_group_metadata = builder.metadata.row_group(0);
1734 for column in row_group_metadata.columns() {
1735 assert!(column.repetition_level_histogram().is_none());
1736 assert!(column.definition_level_histogram().is_none());
1737 assert!(column.unencoded_byte_array_data_bytes().is_none());
1738 }
1739
1740 let arrow_options = ArrowReaderOptions::new()
1742 .with_encoding_stats_as_mask(true)
1743 .with_size_stats_policy(ParquetStatisticsPolicy::skip_except(&[1]));
1744 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
1745 file.try_clone().unwrap(),
1746 arrow_options,
1747 )
1748 .unwrap();
1749
1750 let row_group_metadata = builder.metadata.row_group(0);
1751 for (idx, column) in row_group_metadata.columns().iter().enumerate() {
1752 assert_eq!(column.repetition_level_histogram().is_some(), idx == 1);
1753 assert_eq!(column.definition_level_histogram().is_some(), idx == 1);
1754 assert_eq!(column.unencoded_byte_array_data_bytes().is_some(), idx == 1);
1755 }
1756 }
1757
1758 #[test]
1759 fn test_arrow_reader_single_column() {
1760 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1761
1762 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1763 let original_schema = Arc::clone(builder.schema());
1764
1765 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1766 let reader = builder.with_projection(mask).build().unwrap();
1767
1768 assert_eq!(1, reader.schema().fields().len());
1770 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1771 }
1772
1773 #[test]
1774 fn test_arrow_reader_single_column_by_name() {
1775 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1776
1777 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1778 let original_schema = Arc::clone(builder.schema());
1779
1780 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1781 let reader = builder.with_projection(mask).build().unwrap();
1782
1783 assert_eq!(1, reader.schema().fields().len());
1785 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1786 }
1787
1788 #[test]
1789 fn test_null_column_reader_test() {
1790 let mut file = tempfile::tempfile().unwrap();
1791
1792 let schema = "
1793 message message {
1794 OPTIONAL INT32 int32;
1795 }
1796 ";
1797 let schema = Arc::new(parse_message_type(schema).unwrap());
1798
1799 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1800 generate_single_column_file_with_data::<Int32Type>(
1801 &[vec![], vec![]],
1802 Some(&def_levels),
1803 file.try_clone().unwrap(), schema,
1805 Some(Field::new("int32", ArrowDataType::Null, true)),
1806 &Default::default(),
1807 )
1808 .unwrap();
1809
1810 file.rewind().unwrap();
1811
1812 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1813 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1814
1815 assert_eq!(batches.len(), 4);
1816 for batch in &batches[0..3] {
1817 assert_eq!(batch.num_rows(), 2);
1818 assert_eq!(batch.num_columns(), 1);
1819 assert_eq!(batch.column(0).null_count(), 2);
1820 }
1821
1822 assert_eq!(batches[3].num_rows(), 1);
1823 assert_eq!(batches[3].num_columns(), 1);
1824 assert_eq!(batches[3].column(0).null_count(), 1);
1825 }
1826
1827 #[test]
1828 fn test_primitive_single_column_reader_test() {
1829 run_single_column_reader_tests::<BoolType, _, BoolType>(
1830 2,
1831 ConvertedType::NONE,
1832 None,
1833 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1834 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1835 );
1836 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1837 2,
1838 ConvertedType::NONE,
1839 None,
1840 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1841 &[
1842 Encoding::PLAIN,
1843 Encoding::RLE_DICTIONARY,
1844 Encoding::DELTA_BINARY_PACKED,
1845 Encoding::BYTE_STREAM_SPLIT,
1846 ],
1847 );
1848 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1849 2,
1850 ConvertedType::NONE,
1851 None,
1852 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1853 &[
1854 Encoding::PLAIN,
1855 Encoding::RLE_DICTIONARY,
1856 Encoding::DELTA_BINARY_PACKED,
1857 Encoding::BYTE_STREAM_SPLIT,
1858 ],
1859 );
1860 run_single_column_reader_tests::<FloatType, _, FloatType>(
1861 2,
1862 ConvertedType::NONE,
1863 None,
1864 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1865 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1866 );
1867 }
1868
1869 #[test]
1870 fn test_unsigned_primitive_single_column_reader_test() {
1871 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1872 2,
1873 ConvertedType::UINT_32,
1874 Some(ArrowDataType::UInt32),
1875 |vals| {
1876 Arc::new(UInt32Array::from_iter(
1877 vals.iter().map(|x| x.map(|x| x as u32)),
1878 ))
1879 },
1880 &[
1881 Encoding::PLAIN,
1882 Encoding::RLE_DICTIONARY,
1883 Encoding::DELTA_BINARY_PACKED,
1884 ],
1885 );
1886 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1887 2,
1888 ConvertedType::UINT_64,
1889 Some(ArrowDataType::UInt64),
1890 |vals| {
1891 Arc::new(UInt64Array::from_iter(
1892 vals.iter().map(|x| x.map(|x| x as u64)),
1893 ))
1894 },
1895 &[
1896 Encoding::PLAIN,
1897 Encoding::RLE_DICTIONARY,
1898 Encoding::DELTA_BINARY_PACKED,
1899 ],
1900 );
1901 }
1902
1903 #[test]
1904 fn test_unsigned_roundtrip() {
1905 let schema = Arc::new(Schema::new(vec![
1906 Field::new("uint32", ArrowDataType::UInt32, true),
1907 Field::new("uint64", ArrowDataType::UInt64, true),
1908 ]));
1909
1910 let mut buf = Vec::with_capacity(1024);
1911 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1912
1913 let original = RecordBatch::try_new(
1914 schema,
1915 vec![
1916 Arc::new(UInt32Array::from_iter_values([
1917 0,
1918 i32::MAX as u32,
1919 u32::MAX,
1920 ])),
1921 Arc::new(UInt64Array::from_iter_values([
1922 0,
1923 i64::MAX as u64,
1924 u64::MAX,
1925 ])),
1926 ],
1927 )
1928 .unwrap();
1929
1930 writer.write(&original).unwrap();
1931 writer.close().unwrap();
1932
1933 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1934 let ret = reader.next().unwrap().unwrap();
1935 assert_eq!(ret, original);
1936
1937 ret.column(0)
1939 .as_any()
1940 .downcast_ref::<UInt32Array>()
1941 .unwrap();
1942
1943 ret.column(1)
1944 .as_any()
1945 .downcast_ref::<UInt64Array>()
1946 .unwrap();
1947 }
1948
1949 #[test]
1950 fn test_float16_roundtrip() -> Result<()> {
1951 let schema = Arc::new(Schema::new(vec![
1952 Field::new("float16", ArrowDataType::Float16, false),
1953 Field::new("float16-nullable", ArrowDataType::Float16, true),
1954 ]));
1955
1956 let mut buf = Vec::with_capacity(1024);
1957 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1958
1959 let original = RecordBatch::try_new(
1960 schema,
1961 vec![
1962 Arc::new(Float16Array::from_iter_values([
1963 f16::EPSILON,
1964 f16::MIN,
1965 f16::MAX,
1966 f16::NAN,
1967 f16::INFINITY,
1968 f16::NEG_INFINITY,
1969 f16::ONE,
1970 f16::NEG_ONE,
1971 f16::ZERO,
1972 f16::NEG_ZERO,
1973 f16::E,
1974 f16::PI,
1975 f16::FRAC_1_PI,
1976 ])),
1977 Arc::new(Float16Array::from(vec![
1978 None,
1979 None,
1980 None,
1981 Some(f16::NAN),
1982 Some(f16::INFINITY),
1983 Some(f16::NEG_INFINITY),
1984 None,
1985 None,
1986 None,
1987 None,
1988 None,
1989 None,
1990 Some(f16::FRAC_1_PI),
1991 ])),
1992 ],
1993 )?;
1994
1995 writer.write(&original)?;
1996 writer.close()?;
1997
1998 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1999 let ret = reader.next().unwrap()?;
2000 assert_eq!(ret, original);
2001
2002 ret.column(0).as_primitive::<Float16Type>();
2004 ret.column(1).as_primitive::<Float16Type>();
2005
2006 Ok(())
2007 }
2008
2009 #[test]
2010 fn test_time_utc_roundtrip() -> Result<()> {
2011 let schema = Arc::new(Schema::new(vec![
2012 Field::new(
2013 "time_millis",
2014 ArrowDataType::Time32(TimeUnit::Millisecond),
2015 true,
2016 )
2017 .with_metadata(HashMap::from_iter(vec![(
2018 "adjusted_to_utc".to_string(),
2019 "".to_string(),
2020 )])),
2021 Field::new(
2022 "time_micros",
2023 ArrowDataType::Time64(TimeUnit::Microsecond),
2024 true,
2025 )
2026 .with_metadata(HashMap::from_iter(vec![(
2027 "adjusted_to_utc".to_string(),
2028 "".to_string(),
2029 )])),
2030 ]));
2031
2032 let mut buf = Vec::with_capacity(1024);
2033 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2034
2035 let original = RecordBatch::try_new(
2036 schema,
2037 vec![
2038 Arc::new(Time32MillisecondArray::from(vec![
2039 Some(-1),
2040 Some(0),
2041 Some(86_399_000),
2042 Some(86_400_000),
2043 Some(86_401_000),
2044 None,
2045 ])),
2046 Arc::new(Time64MicrosecondArray::from(vec![
2047 Some(-1),
2048 Some(0),
2049 Some(86_399 * 1_000_000),
2050 Some(86_400 * 1_000_000),
2051 Some(86_401 * 1_000_000),
2052 None,
2053 ])),
2054 ],
2055 )?;
2056
2057 writer.write(&original)?;
2058 writer.close()?;
2059
2060 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2061 let ret = reader.next().unwrap()?;
2062 assert_eq!(ret, original);
2063
2064 ret.column(0).as_primitive::<Time32MillisecondType>();
2066 ret.column(1).as_primitive::<Time64MicrosecondType>();
2067
2068 Ok(())
2069 }
2070
2071 #[test]
2072 fn test_date32_roundtrip() -> Result<()> {
2073 use arrow_array::Date32Array;
2074
2075 let schema = Arc::new(Schema::new(vec![Field::new(
2076 "date32",
2077 ArrowDataType::Date32,
2078 false,
2079 )]));
2080
2081 let mut buf = Vec::with_capacity(1024);
2082
2083 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
2084
2085 let original = RecordBatch::try_new(
2086 schema,
2087 vec![Arc::new(Date32Array::from(vec![
2088 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
2089 ]))],
2090 )?;
2091
2092 writer.write(&original)?;
2093 writer.close()?;
2094
2095 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
2096 let ret = reader.next().unwrap()?;
2097 assert_eq!(ret, original);
2098
2099 ret.column(0).as_primitive::<Date32Type>();
2101
2102 Ok(())
2103 }
2104
2105 #[test]
2106 fn test_date64_roundtrip() -> Result<()> {
2107 use arrow_array::Date64Array;
2108
2109 let schema = Arc::new(Schema::new(vec![
2110 Field::new("small-date64", ArrowDataType::Date64, false),
2111 Field::new("big-date64", ArrowDataType::Date64, false),
2112 Field::new("invalid-date64", ArrowDataType::Date64, false),
2113 ]));
2114
2115 let mut default_buf = Vec::with_capacity(1024);
2116 let mut coerce_buf = Vec::with_capacity(1024);
2117
2118 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
2119
2120 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
2121 let mut coerce_writer =
2122 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
2123
2124 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
2125
2126 let original = RecordBatch::try_new(
2127 schema,
2128 vec![
2129 Arc::new(Date64Array::from(vec![
2131 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
2132 -1_000 * NUM_MILLISECONDS_IN_DAY,
2133 0,
2134 1_000 * NUM_MILLISECONDS_IN_DAY,
2135 1_000_000 * NUM_MILLISECONDS_IN_DAY,
2136 ])),
2137 Arc::new(Date64Array::from(vec![
2139 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2140 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2141 0,
2142 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2143 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
2144 ])),
2145 Arc::new(Date64Array::from(vec![
2147 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2148 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2149 1,
2150 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
2151 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
2152 ])),
2153 ],
2154 )?;
2155
2156 default_writer.write(&original)?;
2157 coerce_writer.write(&original)?;
2158
2159 default_writer.close()?;
2160 coerce_writer.close()?;
2161
2162 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
2163 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
2164
2165 let default_ret = default_reader.next().unwrap()?;
2166 let coerce_ret = coerce_reader.next().unwrap()?;
2167
2168 assert_eq!(default_ret, original);
2170
2171 assert_eq!(coerce_ret.column(0), original.column(0));
2173 assert_ne!(coerce_ret.column(1), original.column(1));
2174 assert_ne!(coerce_ret.column(2), original.column(2));
2175
2176 default_ret.column(0).as_primitive::<Date64Type>();
2178 coerce_ret.column(0).as_primitive::<Date64Type>();
2179
2180 Ok(())
2181 }
2182 struct RandFixedLenGen {}
2183
2184 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
2185 fn r#gen(len: i32) -> FixedLenByteArray {
2186 let mut v = vec![0u8; len as usize];
2187 rng().fill_bytes(&mut v);
2188 ByteArray::from(v).into()
2189 }
2190 }
2191
2192 #[test]
2193 fn test_fixed_length_binary_column_reader() {
2194 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2195 20,
2196 ConvertedType::NONE,
2197 None,
2198 |vals| {
2199 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
2200 for val in vals {
2201 match val {
2202 Some(b) => builder.append_value(b).unwrap(),
2203 None => builder.append_null(),
2204 }
2205 }
2206 Arc::new(builder.finish())
2207 },
2208 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2209 );
2210 }
2211
2212 #[test]
2213 fn test_interval_day_time_column_reader() {
2214 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
2215 12,
2216 ConvertedType::INTERVAL,
2217 None,
2218 |vals| {
2219 Arc::new(
2220 vals.iter()
2221 .map(|x| {
2222 x.as_ref().map(|b| IntervalDayTime {
2223 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
2224 milliseconds: i32::from_le_bytes(
2225 b.as_ref()[8..12].try_into().unwrap(),
2226 ),
2227 })
2228 })
2229 .collect::<IntervalDayTimeArray>(),
2230 )
2231 },
2232 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
2233 );
2234 }
2235
2236 #[test]
2237 fn test_int96_single_column_reader_test() {
2238 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
2239
2240 type TypeHintAndConversionFunction =
2241 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
2242
2243 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
2244 (None, |vals: &[Option<Int96>]| {
2246 Arc::new(TimestampNanosecondArray::from_iter(
2247 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2248 )) as ArrayRef
2249 }),
2250 (
2252 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
2253 |vals: &[Option<Int96>]| {
2254 Arc::new(TimestampSecondArray::from_iter(
2255 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2256 )) as ArrayRef
2257 },
2258 ),
2259 (
2260 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
2261 |vals: &[Option<Int96>]| {
2262 Arc::new(TimestampMillisecondArray::from_iter(
2263 vals.iter().map(|x| x.map(|x| x.to_millis())),
2264 )) as ArrayRef
2265 },
2266 ),
2267 (
2268 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
2269 |vals: &[Option<Int96>]| {
2270 Arc::new(TimestampMicrosecondArray::from_iter(
2271 vals.iter().map(|x| x.map(|x| x.to_micros())),
2272 )) as ArrayRef
2273 },
2274 ),
2275 (
2276 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
2277 |vals: &[Option<Int96>]| {
2278 Arc::new(TimestampNanosecondArray::from_iter(
2279 vals.iter().map(|x| x.map(|x| x.to_nanos())),
2280 )) as ArrayRef
2281 },
2282 ),
2283 (
2285 Some(ArrowDataType::Timestamp(
2286 TimeUnit::Second,
2287 Some(Arc::from("-05:00")),
2288 )),
2289 |vals: &[Option<Int96>]| {
2290 Arc::new(
2291 TimestampSecondArray::from_iter(
2292 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2293 )
2294 .with_timezone("-05:00"),
2295 ) as ArrayRef
2296 },
2297 ),
2298 ];
2299
2300 resolutions.iter().for_each(|(arrow_type, converter)| {
2301 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2302 2,
2303 ConvertedType::NONE,
2304 arrow_type.clone(),
2305 converter,
2306 encodings,
2307 );
2308 })
2309 }
2310
2311 #[test]
2312 fn test_int96_from_spark_file_with_provided_schema() {
2313 use arrow_schema::DataType::Timestamp;
2317 let test_data = arrow::util::test_util::parquet_test_data();
2318 let path = format!("{test_data}/int96_from_spark.parquet");
2319 let file = File::open(path).unwrap();
2320
2321 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2322 "a",
2323 Timestamp(TimeUnit::Microsecond, None),
2324 true,
2325 )]));
2326 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2327
2328 let mut record_reader =
2329 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2330 .unwrap()
2331 .build()
2332 .unwrap();
2333
2334 let batch = record_reader.next().unwrap().unwrap();
2335 assert_eq!(batch.num_columns(), 1);
2336 let column = batch.column(0);
2337 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2338
2339 let expected = Arc::new(Int64Array::from(vec![
2340 Some(1704141296123456),
2341 Some(1704070800000000),
2342 Some(253402225200000000),
2343 Some(1735599600000000),
2344 None,
2345 Some(9089380393200000000),
2346 ]));
2347
2348 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2353 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2354
2355 assert_eq!(casted_timestamps.len(), expected.len());
2356
2357 casted_timestamps
2358 .iter()
2359 .zip(expected.iter())
2360 .for_each(|(lhs, rhs)| {
2361 assert_eq!(lhs, rhs);
2362 });
2363 }
2364
2365 #[test]
2366 fn test_int96_from_spark_file_without_provided_schema() {
2367 use arrow_schema::DataType::Timestamp;
2371 let test_data = arrow::util::test_util::parquet_test_data();
2372 let path = format!("{test_data}/int96_from_spark.parquet");
2373 let file = File::open(path).unwrap();
2374
2375 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2376 .unwrap()
2377 .build()
2378 .unwrap();
2379
2380 let batch = record_reader.next().unwrap().unwrap();
2381 assert_eq!(batch.num_columns(), 1);
2382 let column = batch.column(0);
2383 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2384
2385 let expected = Arc::new(Int64Array::from(vec![
2386 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
2391 Some(-4864435138808946688), ]));
2393
2394 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2399 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2400
2401 assert_eq!(casted_timestamps.len(), expected.len());
2402
2403 casted_timestamps
2404 .iter()
2405 .zip(expected.iter())
2406 .for_each(|(lhs, rhs)| {
2407 assert_eq!(lhs, rhs);
2408 });
2409 }
2410
2411 struct RandUtf8Gen {}
2412
2413 impl RandGen<ByteArrayType> for RandUtf8Gen {
2414 fn r#gen(len: i32) -> ByteArray {
2415 Int32Type::r#gen(len).to_string().as_str().into()
2416 }
2417 }
2418
2419 #[test]
2420 fn test_utf8_single_column_reader_test() {
2421 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2422 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2423 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2424 })))
2425 }
2426
2427 let encodings = &[
2428 Encoding::PLAIN,
2429 Encoding::RLE_DICTIONARY,
2430 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2431 Encoding::DELTA_BYTE_ARRAY,
2432 ];
2433
2434 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2435 2,
2436 ConvertedType::NONE,
2437 None,
2438 |vals| {
2439 Arc::new(BinaryArray::from_iter(
2440 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2441 ))
2442 },
2443 encodings,
2444 );
2445
2446 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2447 2,
2448 ConvertedType::UTF8,
2449 None,
2450 string_converter::<i32>,
2451 encodings,
2452 );
2453
2454 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2455 2,
2456 ConvertedType::UTF8,
2457 Some(ArrowDataType::Utf8),
2458 string_converter::<i32>,
2459 encodings,
2460 );
2461
2462 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2463 2,
2464 ConvertedType::UTF8,
2465 Some(ArrowDataType::LargeUtf8),
2466 string_converter::<i64>,
2467 encodings,
2468 );
2469
2470 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2471 for key in &small_key_types {
2472 for encoding in encodings {
2473 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2474 opts.encoding = *encoding;
2475
2476 let data_type =
2477 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2478
2479 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2481 opts,
2482 2,
2483 ConvertedType::UTF8,
2484 Some(data_type.clone()),
2485 move |vals| {
2486 let vals = string_converter::<i32>(vals);
2487 arrow::compute::cast(&vals, &data_type).unwrap()
2488 },
2489 );
2490 }
2491 }
2492
2493 let key_types = [
2494 ArrowDataType::Int16,
2495 ArrowDataType::UInt16,
2496 ArrowDataType::Int32,
2497 ArrowDataType::UInt32,
2498 ArrowDataType::Int64,
2499 ArrowDataType::UInt64,
2500 ];
2501
2502 for key in &key_types {
2503 let data_type =
2504 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2505
2506 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2507 2,
2508 ConvertedType::UTF8,
2509 Some(data_type.clone()),
2510 move |vals| {
2511 let vals = string_converter::<i32>(vals);
2512 arrow::compute::cast(&vals, &data_type).unwrap()
2513 },
2514 encodings,
2515 );
2516
2517 let data_type = ArrowDataType::Dictionary(
2518 Box::new(key.clone()),
2519 Box::new(ArrowDataType::LargeUtf8),
2520 );
2521
2522 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2523 2,
2524 ConvertedType::UTF8,
2525 Some(data_type.clone()),
2526 move |vals| {
2527 let vals = string_converter::<i64>(vals);
2528 arrow::compute::cast(&vals, &data_type).unwrap()
2529 },
2530 encodings,
2531 );
2532 }
2533 }
2534
2535 #[test]
2536 fn test_decimal_nullable_struct() {
2537 let decimals = Decimal256Array::from_iter_values(
2538 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2539 );
2540
2541 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2542 "decimals",
2543 decimals.data_type().clone(),
2544 false,
2545 )])))
2546 .len(8)
2547 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2548 .child_data(vec![decimals.into_data()])
2549 .build()
2550 .unwrap();
2551
2552 let written =
2553 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2554 .unwrap();
2555
2556 let mut buffer = Vec::with_capacity(1024);
2557 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2558 writer.write(&written).unwrap();
2559 writer.close().unwrap();
2560
2561 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2562 .unwrap()
2563 .collect::<Result<Vec<_>, _>>()
2564 .unwrap();
2565
2566 assert_eq!(&written.slice(0, 3), &read[0]);
2567 assert_eq!(&written.slice(3, 3), &read[1]);
2568 assert_eq!(&written.slice(6, 2), &read[2]);
2569 }
2570
2571 #[test]
2572 fn test_int32_nullable_struct() {
2573 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2574 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2575 "int32",
2576 int32.data_type().clone(),
2577 false,
2578 )])))
2579 .len(8)
2580 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2581 .child_data(vec![int32.into_data()])
2582 .build()
2583 .unwrap();
2584
2585 let written =
2586 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2587 .unwrap();
2588
2589 let mut buffer = Vec::with_capacity(1024);
2590 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2591 writer.write(&written).unwrap();
2592 writer.close().unwrap();
2593
2594 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2595 .unwrap()
2596 .collect::<Result<Vec<_>, _>>()
2597 .unwrap();
2598
2599 assert_eq!(&written.slice(0, 3), &read[0]);
2600 assert_eq!(&written.slice(3, 3), &read[1]);
2601 assert_eq!(&written.slice(6, 2), &read[2]);
2602 }
2603
2604 #[test]
2605 fn test_decimal_list() {
2606 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2607
2608 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2610 decimals.data_type().clone(),
2611 false,
2612 ))))
2613 .len(7)
2614 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2615 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2616 .child_data(vec![decimals.into_data()])
2617 .build()
2618 .unwrap();
2619
2620 let written =
2621 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2622 .unwrap();
2623
2624 let mut buffer = Vec::with_capacity(1024);
2625 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2626 writer.write(&written).unwrap();
2627 writer.close().unwrap();
2628
2629 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2630 .unwrap()
2631 .collect::<Result<Vec<_>, _>>()
2632 .unwrap();
2633
2634 assert_eq!(&written.slice(0, 3), &read[0]);
2635 assert_eq!(&written.slice(3, 3), &read[1]);
2636 assert_eq!(&written.slice(6, 1), &read[2]);
2637 }
2638
2639 #[test]
2640 fn test_read_decimal_file() {
2641 use arrow_array::Decimal128Array;
2642 let testdata = arrow::util::test_util::parquet_test_data();
2643 let file_variants = vec![
2644 ("byte_array", 4),
2645 ("fixed_length", 25),
2646 ("int32", 4),
2647 ("int64", 10),
2648 ];
2649 for (prefix, target_precision) in file_variants {
2650 let path = format!("{testdata}/{prefix}_decimal.parquet");
2651 let file = File::open(path).unwrap();
2652 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2653
2654 let batch = record_reader.next().unwrap().unwrap();
2655 assert_eq!(batch.num_rows(), 24);
2656 let col = batch
2657 .column(0)
2658 .as_any()
2659 .downcast_ref::<Decimal128Array>()
2660 .unwrap();
2661
2662 let expected = 1..25;
2663
2664 assert_eq!(col.precision(), target_precision);
2665 assert_eq!(col.scale(), 2);
2666
2667 for (i, v) in expected.enumerate() {
2668 assert_eq!(col.value(i), v * 100_i128);
2669 }
2670 }
2671 }
2672
2673 #[test]
2674 fn test_read_float16_nonzeros_file() {
2675 use arrow_array::Float16Array;
2676 let testdata = arrow::util::test_util::parquet_test_data();
2677 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2679 let file = File::open(path).unwrap();
2680 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2681
2682 let batch = record_reader.next().unwrap().unwrap();
2683 assert_eq!(batch.num_rows(), 8);
2684 let col = batch
2685 .column(0)
2686 .as_any()
2687 .downcast_ref::<Float16Array>()
2688 .unwrap();
2689
2690 let f16_two = f16::ONE + f16::ONE;
2691
2692 assert_eq!(col.null_count(), 1);
2693 assert!(col.is_null(0));
2694 assert_eq!(col.value(1), f16::ONE);
2695 assert_eq!(col.value(2), -f16_two);
2696 assert!(col.value(3).is_nan());
2697 assert_eq!(col.value(4), f16::ZERO);
2698 assert!(col.value(4).is_sign_positive());
2699 assert_eq!(col.value(5), f16::NEG_ONE);
2700 assert_eq!(col.value(6), f16::NEG_ZERO);
2701 assert!(col.value(6).is_sign_negative());
2702 assert_eq!(col.value(7), f16_two);
2703 }
2704
2705 #[test]
2706 fn test_read_float16_zeros_file() {
2707 use arrow_array::Float16Array;
2708 let testdata = arrow::util::test_util::parquet_test_data();
2709 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2711 let file = File::open(path).unwrap();
2712 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2713
2714 let batch = record_reader.next().unwrap().unwrap();
2715 assert_eq!(batch.num_rows(), 3);
2716 let col = batch
2717 .column(0)
2718 .as_any()
2719 .downcast_ref::<Float16Array>()
2720 .unwrap();
2721
2722 assert_eq!(col.null_count(), 1);
2723 assert!(col.is_null(0));
2724 assert_eq!(col.value(1), f16::ZERO);
2725 assert!(col.value(1).is_sign_positive());
2726 assert!(col.value(2).is_nan());
2727 }
2728
2729 #[test]
2730 fn test_read_float32_float64_byte_stream_split() {
2731 let path = format!(
2732 "{}/byte_stream_split.zstd.parquet",
2733 arrow::util::test_util::parquet_test_data(),
2734 );
2735 let file = File::open(path).unwrap();
2736 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2737
2738 let mut row_count = 0;
2739 for batch in record_reader {
2740 let batch = batch.unwrap();
2741 row_count += batch.num_rows();
2742 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2743 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2744
2745 for &x in f32_col.values() {
2747 assert!(x > -10.0);
2748 assert!(x < 10.0);
2749 }
2750 for &x in f64_col.values() {
2751 assert!(x > -10.0);
2752 assert!(x < 10.0);
2753 }
2754 }
2755 assert_eq!(row_count, 300);
2756 }
2757
2758 #[test]
2759 fn test_read_extended_byte_stream_split() {
2760 let path = format!(
2761 "{}/byte_stream_split_extended.gzip.parquet",
2762 arrow::util::test_util::parquet_test_data(),
2763 );
2764 let file = File::open(path).unwrap();
2765 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2766
2767 let mut row_count = 0;
2768 for batch in record_reader {
2769 let batch = batch.unwrap();
2770 row_count += batch.num_rows();
2771
2772 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2774 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2775 assert_eq!(f16_col.len(), f16_bss.len());
2776 f16_col
2777 .iter()
2778 .zip(f16_bss.iter())
2779 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2780
2781 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2783 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2784 assert_eq!(f32_col.len(), f32_bss.len());
2785 f32_col
2786 .iter()
2787 .zip(f32_bss.iter())
2788 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2789
2790 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2792 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2793 assert_eq!(f64_col.len(), f64_bss.len());
2794 f64_col
2795 .iter()
2796 .zip(f64_bss.iter())
2797 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2798
2799 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2801 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2802 assert_eq!(i32_col.len(), i32_bss.len());
2803 i32_col
2804 .iter()
2805 .zip(i32_bss.iter())
2806 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2807
2808 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2810 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2811 assert_eq!(i64_col.len(), i64_bss.len());
2812 i64_col
2813 .iter()
2814 .zip(i64_bss.iter())
2815 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2816
2817 let flba_col = batch.column(10).as_fixed_size_binary();
2819 let flba_bss = batch.column(11).as_fixed_size_binary();
2820 assert_eq!(flba_col.len(), flba_bss.len());
2821 flba_col
2822 .iter()
2823 .zip(flba_bss.iter())
2824 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2825
2826 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2828 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2829 assert_eq!(dec_col.len(), dec_bss.len());
2830 dec_col
2831 .iter()
2832 .zip(dec_bss.iter())
2833 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2834 }
2835 assert_eq!(row_count, 200);
2836 }
2837
2838 #[test]
2839 fn test_read_incorrect_map_schema_file() {
2840 let testdata = arrow::util::test_util::parquet_test_data();
2841 let path = format!("{testdata}/incorrect_map_schema.parquet");
2843 let file = File::open(path).unwrap();
2844 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2845
2846 let batch = record_reader.next().unwrap().unwrap();
2847 assert_eq!(batch.num_rows(), 1);
2848
2849 let expected_schema = Schema::new(vec![Field::new(
2850 "my_map",
2851 ArrowDataType::Map(
2852 Arc::new(Field::new(
2853 "key_value",
2854 ArrowDataType::Struct(Fields::from(vec![
2855 Field::new("key", ArrowDataType::Utf8, false),
2856 Field::new("value", ArrowDataType::Utf8, true),
2857 ])),
2858 false,
2859 )),
2860 false,
2861 ),
2862 true,
2863 )]);
2864 assert_eq!(batch.schema().as_ref(), &expected_schema);
2865
2866 assert_eq!(batch.num_rows(), 1);
2867 assert_eq!(batch.column(0).null_count(), 0);
2868 assert_eq!(
2869 batch.column(0).as_map().keys().as_ref(),
2870 &StringArray::from(vec!["parent", "name"])
2871 );
2872 assert_eq!(
2873 batch.column(0).as_map().values().as_ref(),
2874 &StringArray::from(vec!["another", "report"])
2875 );
2876 }
2877
2878 #[test]
2879 fn test_read_dict_fixed_size_binary() {
2880 let schema = Arc::new(Schema::new(vec![Field::new(
2881 "a",
2882 ArrowDataType::Dictionary(
2883 Box::new(ArrowDataType::UInt8),
2884 Box::new(ArrowDataType::FixedSizeBinary(8)),
2885 ),
2886 true,
2887 )]));
2888 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2889 let values = FixedSizeBinaryArray::try_from_iter(
2890 vec![
2891 (0u8..8u8).collect::<Vec<u8>>(),
2892 (24u8..32u8).collect::<Vec<u8>>(),
2893 ]
2894 .into_iter(),
2895 )
2896 .unwrap();
2897 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2898 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2899
2900 let mut buffer = Vec::with_capacity(1024);
2901 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2902 writer.write(&batch).unwrap();
2903 writer.close().unwrap();
2904 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2905 .unwrap()
2906 .collect::<Result<Vec<_>, _>>()
2907 .unwrap();
2908
2909 assert_eq!(read.len(), 1);
2910 assert_eq!(&batch, &read[0])
2911 }
2912
2913 #[test]
2914 fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2915 let struct_fields = Fields::from(vec![
2922 Field::new(
2923 "city",
2924 ArrowDataType::Dictionary(
2925 Box::new(ArrowDataType::UInt8),
2926 Box::new(ArrowDataType::Utf8),
2927 ),
2928 true,
2929 ),
2930 Field::new("name", ArrowDataType::Utf8, true),
2931 ]);
2932 let schema = Arc::new(Schema::new(vec![Field::new(
2933 "items",
2934 ArrowDataType::Struct(struct_fields.clone()),
2935 true,
2936 )]));
2937
2938 let items_arr = StructArray::new(
2939 struct_fields,
2940 vec![
2941 Arc::new(DictionaryArray::new(
2942 UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2943 Arc::new(StringArray::from_iter_values(vec![
2944 "quebec",
2945 "fredericton",
2946 "halifax",
2947 ])),
2948 )),
2949 Arc::new(StringArray::from_iter_values(vec![
2950 "albert", "terry", "lance", "", "tim",
2951 ])),
2952 ],
2953 Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2954 );
2955
2956 let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2957 let mut buffer = Vec::with_capacity(1024);
2958 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2959 writer.write(&batch).unwrap();
2960 writer.close().unwrap();
2961 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2962 .unwrap()
2963 .collect::<Result<Vec<_>, _>>()
2964 .unwrap();
2965
2966 assert_eq!(read.len(), 1);
2967 assert_eq!(&batch, &read[0])
2968 }
2969
2970 #[derive(Clone)]
2972 struct TestOptions {
2973 num_row_groups: usize,
2976 num_rows: usize,
2978 record_batch_size: usize,
2980 null_percent: Option<usize>,
2982 write_batch_size: usize,
2987 max_data_page_size: usize,
2989 max_dict_page_size: usize,
2991 writer_version: WriterVersion,
2993 enabled_statistics: EnabledStatistics,
2995 encoding: Encoding,
2997 row_selections: Option<(RowSelection, usize)>,
2999 row_filter: Option<Vec<bool>>,
3001 limit: Option<usize>,
3003 offset: Option<usize>,
3005 }
3006
3007 impl std::fmt::Debug for TestOptions {
3009 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
3010 f.debug_struct("TestOptions")
3011 .field("num_row_groups", &self.num_row_groups)
3012 .field("num_rows", &self.num_rows)
3013 .field("record_batch_size", &self.record_batch_size)
3014 .field("null_percent", &self.null_percent)
3015 .field("write_batch_size", &self.write_batch_size)
3016 .field("max_data_page_size", &self.max_data_page_size)
3017 .field("max_dict_page_size", &self.max_dict_page_size)
3018 .field("writer_version", &self.writer_version)
3019 .field("enabled_statistics", &self.enabled_statistics)
3020 .field("encoding", &self.encoding)
3021 .field("row_selections", &self.row_selections.is_some())
3022 .field("row_filter", &self.row_filter.is_some())
3023 .field("limit", &self.limit)
3024 .field("offset", &self.offset)
3025 .finish()
3026 }
3027 }
3028
3029 impl Default for TestOptions {
3030 fn default() -> Self {
3031 Self {
3032 num_row_groups: 2,
3033 num_rows: 100,
3034 record_batch_size: 15,
3035 null_percent: None,
3036 write_batch_size: 64,
3037 max_data_page_size: 1024 * 1024,
3038 max_dict_page_size: 1024 * 1024,
3039 writer_version: WriterVersion::PARQUET_1_0,
3040 enabled_statistics: EnabledStatistics::Page,
3041 encoding: Encoding::PLAIN,
3042 row_selections: None,
3043 row_filter: None,
3044 limit: None,
3045 offset: None,
3046 }
3047 }
3048 }
3049
3050 impl TestOptions {
3051 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
3052 Self {
3053 num_row_groups,
3054 num_rows,
3055 record_batch_size,
3056 ..Default::default()
3057 }
3058 }
3059
3060 fn with_null_percent(self, null_percent: usize) -> Self {
3061 Self {
3062 null_percent: Some(null_percent),
3063 ..self
3064 }
3065 }
3066
3067 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
3068 Self {
3069 max_data_page_size,
3070 ..self
3071 }
3072 }
3073
3074 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
3075 Self {
3076 max_dict_page_size,
3077 ..self
3078 }
3079 }
3080
3081 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
3082 Self {
3083 enabled_statistics,
3084 ..self
3085 }
3086 }
3087
3088 fn with_row_selections(self) -> Self {
3089 assert!(self.row_filter.is_none(), "Must set row selection first");
3090
3091 let mut rng = rng();
3092 let step = rng.random_range(self.record_batch_size..self.num_rows);
3093 let row_selections = create_test_selection(
3094 step,
3095 self.num_row_groups * self.num_rows,
3096 rng.random::<bool>(),
3097 );
3098 Self {
3099 row_selections: Some(row_selections),
3100 ..self
3101 }
3102 }
3103
3104 fn with_row_filter(self) -> Self {
3105 let row_count = match &self.row_selections {
3106 Some((_, count)) => *count,
3107 None => self.num_row_groups * self.num_rows,
3108 };
3109
3110 let mut rng = rng();
3111 Self {
3112 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
3113 ..self
3114 }
3115 }
3116
3117 fn with_limit(self, limit: usize) -> Self {
3118 Self {
3119 limit: Some(limit),
3120 ..self
3121 }
3122 }
3123
3124 fn with_offset(self, offset: usize) -> Self {
3125 Self {
3126 offset: Some(offset),
3127 ..self
3128 }
3129 }
3130
3131 fn writer_props(&self) -> WriterProperties {
3132 let builder = WriterProperties::builder()
3133 .set_data_page_size_limit(self.max_data_page_size)
3134 .set_write_batch_size(self.write_batch_size)
3135 .set_writer_version(self.writer_version)
3136 .set_statistics_enabled(self.enabled_statistics);
3137
3138 let builder = match self.encoding {
3139 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
3140 .set_dictionary_enabled(true)
3141 .set_dictionary_page_size_limit(self.max_dict_page_size),
3142 _ => builder
3143 .set_dictionary_enabled(false)
3144 .set_encoding(self.encoding),
3145 };
3146
3147 builder.build()
3148 }
3149 }
3150
3151 fn run_single_column_reader_tests<T, F, G>(
3158 rand_max: i32,
3159 converted_type: ConvertedType,
3160 arrow_type: Option<ArrowDataType>,
3161 converter: F,
3162 encodings: &[Encoding],
3163 ) where
3164 T: DataType,
3165 G: RandGen<T>,
3166 F: Fn(&[Option<T::T>]) -> ArrayRef,
3167 {
3168 let all_options = vec![
3169 TestOptions::new(2, 100, 15),
3172 TestOptions::new(3, 25, 5),
3177 TestOptions::new(4, 100, 25),
3181 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
3183 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
3185 TestOptions::new(2, 256, 127).with_null_percent(0),
3187 TestOptions::new(2, 256, 93).with_null_percent(25),
3189 TestOptions::new(4, 100, 25).with_limit(0),
3191 TestOptions::new(4, 100, 25).with_limit(50),
3193 TestOptions::new(4, 100, 25).with_limit(10),
3195 TestOptions::new(4, 100, 25).with_limit(101),
3197 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
3199 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
3201 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
3203 TestOptions::new(2, 256, 91)
3205 .with_null_percent(25)
3206 .with_enabled_statistics(EnabledStatistics::Chunk),
3207 TestOptions::new(2, 256, 91)
3209 .with_null_percent(25)
3210 .with_enabled_statistics(EnabledStatistics::None),
3211 TestOptions::new(2, 128, 91)
3213 .with_null_percent(100)
3214 .with_enabled_statistics(EnabledStatistics::None),
3215 TestOptions::new(2, 100, 15).with_row_selections(),
3220 TestOptions::new(3, 25, 5).with_row_selections(),
3225 TestOptions::new(4, 100, 25).with_row_selections(),
3229 TestOptions::new(3, 256, 73)
3231 .with_max_data_page_size(128)
3232 .with_row_selections(),
3233 TestOptions::new(3, 256, 57)
3235 .with_max_dict_page_size(128)
3236 .with_row_selections(),
3237 TestOptions::new(2, 256, 127)
3239 .with_null_percent(0)
3240 .with_row_selections(),
3241 TestOptions::new(2, 256, 93)
3243 .with_null_percent(25)
3244 .with_row_selections(),
3245 TestOptions::new(2, 256, 93)
3247 .with_null_percent(25)
3248 .with_row_selections()
3249 .with_limit(10),
3250 TestOptions::new(2, 256, 93)
3252 .with_null_percent(25)
3253 .with_row_selections()
3254 .with_offset(20)
3255 .with_limit(10),
3256 TestOptions::new(4, 100, 25).with_row_filter(),
3260 TestOptions::new(4, 100, 25)
3262 .with_row_selections()
3263 .with_row_filter(),
3264 TestOptions::new(2, 256, 93)
3266 .with_null_percent(25)
3267 .with_max_data_page_size(10)
3268 .with_row_filter(),
3269 TestOptions::new(2, 256, 93)
3271 .with_null_percent(25)
3272 .with_max_data_page_size(10)
3273 .with_row_selections()
3274 .with_row_filter(),
3275 TestOptions::new(2, 256, 93)
3277 .with_enabled_statistics(EnabledStatistics::None)
3278 .with_max_data_page_size(10)
3279 .with_row_selections(),
3280 ];
3281
3282 all_options.into_iter().for_each(|opts| {
3283 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3284 for encoding in encodings {
3285 let opts = TestOptions {
3286 writer_version,
3287 encoding: *encoding,
3288 ..opts.clone()
3289 };
3290
3291 single_column_reader_test::<T, _, G>(
3292 opts,
3293 rand_max,
3294 converted_type,
3295 arrow_type.clone(),
3296 &converter,
3297 )
3298 }
3299 }
3300 });
3301 }
3302
3303 fn single_column_reader_test<T, F, G>(
3307 opts: TestOptions,
3308 rand_max: i32,
3309 converted_type: ConvertedType,
3310 arrow_type: Option<ArrowDataType>,
3311 converter: F,
3312 ) where
3313 T: DataType,
3314 G: RandGen<T>,
3315 F: Fn(&[Option<T::T>]) -> ArrayRef,
3316 {
3317 println!(
3319 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3320 T::get_physical_type(),
3321 converted_type,
3322 arrow_type,
3323 opts
3324 );
3325
3326 let (repetition, def_levels) = match opts.null_percent.as_ref() {
3328 Some(null_percent) => {
3329 let mut rng = rng();
3330
3331 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3332 .map(|_| {
3333 std::iter::from_fn(|| {
3334 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3335 })
3336 .take(opts.num_rows)
3337 .collect()
3338 })
3339 .collect();
3340 (Repetition::OPTIONAL, Some(def_levels))
3341 }
3342 None => (Repetition::REQUIRED, None),
3343 };
3344
3345 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3347 .map(|idx| {
3348 let null_count = match def_levels.as_ref() {
3349 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3350 None => 0,
3351 };
3352 G::gen_vec(rand_max, opts.num_rows - null_count)
3353 })
3354 .collect();
3355
3356 let len = match T::get_physical_type() {
3357 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3358 crate::basic::Type::INT96 => 12,
3359 _ => -1,
3360 };
3361
3362 let fields = vec![Arc::new(
3363 Type::primitive_type_builder("leaf", T::get_physical_type())
3364 .with_repetition(repetition)
3365 .with_converted_type(converted_type)
3366 .with_length(len)
3367 .build()
3368 .unwrap(),
3369 )];
3370
3371 let schema = Arc::new(
3372 Type::group_type_builder("test_schema")
3373 .with_fields(fields)
3374 .build()
3375 .unwrap(),
3376 );
3377
3378 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3379
3380 let mut file = tempfile::tempfile().unwrap();
3381
3382 generate_single_column_file_with_data::<T>(
3383 &values,
3384 def_levels.as_ref(),
3385 file.try_clone().unwrap(), schema,
3387 arrow_field,
3388 &opts,
3389 )
3390 .unwrap();
3391
3392 file.rewind().unwrap();
3393
3394 let options = ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::from(
3395 opts.enabled_statistics == EnabledStatistics::Page,
3396 ));
3397
3398 let mut builder =
3399 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3400
3401 let expected_data = match opts.row_selections {
3402 Some((selections, row_count)) => {
3403 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3404
3405 let mut skip_data: Vec<Option<T::T>> = vec![];
3406 let dequeue: VecDeque<RowSelector> = selections.clone().into();
3407 for select in dequeue {
3408 if select.skip {
3409 without_skip_data.drain(0..select.row_count);
3410 } else {
3411 skip_data.extend(without_skip_data.drain(0..select.row_count));
3412 }
3413 }
3414 builder = builder.with_row_selection(selections);
3415
3416 assert_eq!(skip_data.len(), row_count);
3417 skip_data
3418 }
3419 None => {
3420 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3422 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3423 expected_data
3424 }
3425 };
3426
3427 let mut expected_data = match opts.row_filter {
3428 Some(filter) => {
3429 let expected_data = expected_data
3430 .into_iter()
3431 .zip(filter.iter())
3432 .filter_map(|(d, f)| f.then(|| d))
3433 .collect();
3434
3435 let mut filter_offset = 0;
3436 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3437 ProjectionMask::all(),
3438 move |b| {
3439 let array = BooleanArray::from_iter(
3440 filter
3441 .iter()
3442 .skip(filter_offset)
3443 .take(b.num_rows())
3444 .map(|x| Some(*x)),
3445 );
3446 filter_offset += b.num_rows();
3447 Ok(array)
3448 },
3449 ))]);
3450
3451 builder = builder.with_row_filter(filter);
3452 expected_data
3453 }
3454 None => expected_data,
3455 };
3456
3457 if let Some(offset) = opts.offset {
3458 builder = builder.with_offset(offset);
3459 expected_data = expected_data.into_iter().skip(offset).collect();
3460 }
3461
3462 if let Some(limit) = opts.limit {
3463 builder = builder.with_limit(limit);
3464 expected_data = expected_data.into_iter().take(limit).collect();
3465 }
3466
3467 let mut record_reader = builder
3468 .with_batch_size(opts.record_batch_size)
3469 .build()
3470 .unwrap();
3471
3472 let mut total_read = 0;
3473 loop {
3474 let maybe_batch = record_reader.next();
3475 if total_read < expected_data.len() {
3476 let end = min(total_read + opts.record_batch_size, expected_data.len());
3477 let batch = maybe_batch.unwrap().unwrap();
3478 assert_eq!(end - total_read, batch.num_rows());
3479
3480 let a = converter(&expected_data[total_read..end]);
3481 let b = batch.column(0);
3482
3483 assert_eq!(a.data_type(), b.data_type());
3484 assert_eq!(a.to_data(), b.to_data());
3485 assert_eq!(
3486 a.as_any().type_id(),
3487 b.as_any().type_id(),
3488 "incorrect type ids"
3489 );
3490
3491 total_read = end;
3492 } else {
3493 assert!(maybe_batch.is_none());
3494 break;
3495 }
3496 }
3497 }
3498
3499 fn gen_expected_data<T: DataType>(
3500 def_levels: Option<&Vec<Vec<i16>>>,
3501 values: &[Vec<T::T>],
3502 ) -> Vec<Option<T::T>> {
3503 let data: Vec<Option<T::T>> = match def_levels {
3504 Some(levels) => {
3505 let mut values_iter = values.iter().flatten();
3506 levels
3507 .iter()
3508 .flatten()
3509 .map(|d| match d {
3510 1 => Some(values_iter.next().cloned().unwrap()),
3511 0 => None,
3512 _ => unreachable!(),
3513 })
3514 .collect()
3515 }
3516 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
3517 };
3518 data
3519 }
3520
3521 fn generate_single_column_file_with_data<T: DataType>(
3522 values: &[Vec<T::T>],
3523 def_levels: Option<&Vec<Vec<i16>>>,
3524 file: File,
3525 schema: TypePtr,
3526 field: Option<Field>,
3527 opts: &TestOptions,
3528 ) -> Result<ParquetMetaData> {
3529 let mut writer_props = opts.writer_props();
3530 if let Some(field) = field {
3531 let arrow_schema = Schema::new(vec![field]);
3532 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3533 }
3534
3535 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3536
3537 for (idx, v) in values.iter().enumerate() {
3538 let def_levels = def_levels.map(|d| d[idx].as_slice());
3539 let mut row_group_writer = writer.next_row_group()?;
3540 {
3541 let mut column_writer = row_group_writer
3542 .next_column()?
3543 .expect("Column writer is none!");
3544
3545 column_writer
3546 .typed::<T>()
3547 .write_batch(v, def_levels, None)?;
3548
3549 column_writer.close()?;
3550 }
3551 row_group_writer.close()?;
3552 }
3553
3554 writer.close()
3555 }
3556
3557 fn get_test_file(file_name: &str) -> File {
3558 let mut path = PathBuf::new();
3559 path.push(arrow::util::test_util::arrow_test_data());
3560 path.push(file_name);
3561
3562 File::open(path.as_path()).expect("File not found!")
3563 }
3564
3565 #[test]
3566 fn test_read_structs() {
3567 let testdata = arrow::util::test_util::parquet_test_data();
3571 let path = format!("{testdata}/nested_structs.rust.parquet");
3572 let file = File::open(&path).unwrap();
3573 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3574
3575 for batch in record_batch_reader {
3576 batch.unwrap();
3577 }
3578
3579 let file = File::open(&path).unwrap();
3580 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3581
3582 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3583 let projected_reader = builder
3584 .with_projection(mask)
3585 .with_batch_size(60)
3586 .build()
3587 .unwrap();
3588
3589 let expected_schema = Schema::new(vec![
3590 Field::new(
3591 "roll_num",
3592 ArrowDataType::Struct(Fields::from(vec![Field::new(
3593 "count",
3594 ArrowDataType::UInt64,
3595 false,
3596 )])),
3597 false,
3598 ),
3599 Field::new(
3600 "PC_CUR",
3601 ArrowDataType::Struct(Fields::from(vec![
3602 Field::new("mean", ArrowDataType::Int64, false),
3603 Field::new("sum", ArrowDataType::Int64, false),
3604 ])),
3605 false,
3606 ),
3607 ]);
3608
3609 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3611
3612 for batch in projected_reader {
3613 let batch = batch.unwrap();
3614 assert_eq!(batch.schema().as_ref(), &expected_schema);
3615 }
3616 }
3617
3618 #[test]
3619 fn test_read_structs_by_name() {
3621 let testdata = arrow::util::test_util::parquet_test_data();
3622 let path = format!("{testdata}/nested_structs.rust.parquet");
3623 let file = File::open(&path).unwrap();
3624 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3625
3626 for batch in record_batch_reader {
3627 batch.unwrap();
3628 }
3629
3630 let file = File::open(&path).unwrap();
3631 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3632
3633 let mask = ProjectionMask::columns(
3634 builder.parquet_schema(),
3635 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3636 );
3637 let projected_reader = builder
3638 .with_projection(mask)
3639 .with_batch_size(60)
3640 .build()
3641 .unwrap();
3642
3643 let expected_schema = Schema::new(vec![
3644 Field::new(
3645 "roll_num",
3646 ArrowDataType::Struct(Fields::from(vec![Field::new(
3647 "count",
3648 ArrowDataType::UInt64,
3649 false,
3650 )])),
3651 false,
3652 ),
3653 Field::new(
3654 "PC_CUR",
3655 ArrowDataType::Struct(Fields::from(vec![
3656 Field::new("mean", ArrowDataType::Int64, false),
3657 Field::new("sum", ArrowDataType::Int64, false),
3658 ])),
3659 false,
3660 ),
3661 ]);
3662
3663 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3664
3665 for batch in projected_reader {
3666 let batch = batch.unwrap();
3667 assert_eq!(batch.schema().as_ref(), &expected_schema);
3668 }
3669 }
3670
3671 #[test]
3672 fn test_read_maps() {
3673 let testdata = arrow::util::test_util::parquet_test_data();
3674 let path = format!("{testdata}/nested_maps.snappy.parquet");
3675 let file = File::open(path).unwrap();
3676 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3677
3678 for batch in record_batch_reader {
3679 batch.unwrap();
3680 }
3681 }
3682
3683 #[test]
3684 fn test_nested_nullability() {
3685 let message_type = "message nested {
3686 OPTIONAL Group group {
3687 REQUIRED INT32 leaf;
3688 }
3689 }";
3690
3691 let file = tempfile::tempfile().unwrap();
3692 let schema = Arc::new(parse_message_type(message_type).unwrap());
3693
3694 {
3695 let mut writer =
3697 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3698 .unwrap();
3699
3700 {
3701 let mut row_group_writer = writer.next_row_group().unwrap();
3702 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3703
3704 column_writer
3705 .typed::<Int32Type>()
3706 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3707 .unwrap();
3708
3709 column_writer.close().unwrap();
3710 row_group_writer.close().unwrap();
3711 }
3712
3713 writer.close().unwrap();
3714 }
3715
3716 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3717 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3718
3719 let reader = builder.with_projection(mask).build().unwrap();
3720
3721 let expected_schema = Schema::new(vec![Field::new(
3722 "group",
3723 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3724 true,
3725 )]);
3726
3727 let batch = reader.into_iter().next().unwrap().unwrap();
3728 assert_eq!(batch.schema().as_ref(), &expected_schema);
3729 assert_eq!(batch.num_rows(), 4);
3730 assert_eq!(batch.column(0).null_count(), 2);
3731 }
3732
3733 #[test]
3734 fn test_invalid_utf8() {
3735 let data = vec![
3737 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3738 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3739 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3740 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3741 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3742 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3743 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3744 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3745 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3746 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3747 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3748 82, 49,
3749 ];
3750
3751 let file = Bytes::from(data);
3752 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3753
3754 let error = record_batch_reader.next().unwrap().unwrap_err();
3755
3756 assert!(
3757 error.to_string().contains("invalid utf-8 sequence"),
3758 "{}",
3759 error
3760 );
3761 }
3762
3763 #[test]
3764 fn test_invalid_utf8_string_array() {
3765 test_invalid_utf8_string_array_inner::<i32>();
3766 }
3767
3768 #[test]
3769 fn test_invalid_utf8_large_string_array() {
3770 test_invalid_utf8_string_array_inner::<i64>();
3771 }
3772
3773 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3774 let cases = [
3775 invalid_utf8_first_char::<O>(),
3776 invalid_utf8_first_char_long_strings::<O>(),
3777 invalid_utf8_later_char::<O>(),
3778 invalid_utf8_later_char_long_strings::<O>(),
3779 invalid_utf8_later_char_really_long_strings::<O>(),
3780 invalid_utf8_later_char_really_long_strings2::<O>(),
3781 ];
3782 for array in &cases {
3783 for encoding in STRING_ENCODINGS {
3784 let array = unsafe {
3787 GenericStringArray::<O>::new_unchecked(
3788 array.offsets().clone(),
3789 array.values().clone(),
3790 array.nulls().cloned(),
3791 )
3792 };
3793 let data_type = array.data_type().clone();
3794 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3795 let err = read_from_parquet(data).unwrap_err();
3796 let expected_err =
3797 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3798 assert!(
3799 err.to_string().contains(expected_err),
3800 "data type: {data_type}, expected: {expected_err}, got: {err}"
3801 );
3802 }
3803 }
3804 }
3805
3806 #[test]
3807 fn test_invalid_utf8_string_view_array() {
3808 let cases = [
3809 invalid_utf8_first_char::<i32>(),
3810 invalid_utf8_first_char_long_strings::<i32>(),
3811 invalid_utf8_later_char::<i32>(),
3812 invalid_utf8_later_char_long_strings::<i32>(),
3813 invalid_utf8_later_char_really_long_strings::<i32>(),
3814 invalid_utf8_later_char_really_long_strings2::<i32>(),
3815 ];
3816
3817 for encoding in STRING_ENCODINGS {
3818 for array in &cases {
3819 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3820 let array = array.as_binary_view();
3821
3822 let array = unsafe {
3825 StringViewArray::new_unchecked(
3826 array.views().clone(),
3827 array.data_buffers().to_vec(),
3828 array.nulls().cloned(),
3829 )
3830 };
3831
3832 let data_type = array.data_type().clone();
3833 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3834 let err = read_from_parquet(data).unwrap_err();
3835 let expected_err =
3836 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3837 assert!(
3838 err.to_string().contains(expected_err),
3839 "data type: {data_type}, expected: {expected_err}, got: {err}"
3840 );
3841 }
3842 }
3843 }
3844
3845 const STRING_ENCODINGS: &[Option<Encoding>] = &[
3847 None,
3848 Some(Encoding::PLAIN),
3849 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3850 Some(Encoding::DELTA_BYTE_ARRAY),
3851 ];
3852
3853 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3856
3857 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3860
3861 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3863 let valid: &[u8] = b" ";
3864 let invalid = INVALID_UTF8_FIRST_CHAR;
3865 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3866 }
3867
3868 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3872 let valid: &[u8] = b" ";
3873 let mut invalid = vec![];
3874 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3875 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3876 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3877 }
3878
3879 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3882 let valid: &[u8] = b" ";
3883 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3884 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3885 }
3886
3887 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3891 let valid: &[u8] = b" ";
3892 let mut invalid = vec![];
3893 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3894 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3895 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3896 }
3897
3898 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3902 let valid: &[u8] = b" ";
3903 let mut invalid = vec![];
3904 for _ in 0..10 {
3905 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3907 }
3908 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3909 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3910 }
3911
3912 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3915 let valid: &[u8] = b" ";
3916 let mut valid_long = vec![];
3917 for _ in 0..10 {
3918 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3920 }
3921 let invalid = INVALID_UTF8_LATER_CHAR;
3922 GenericBinaryArray::<O>::from_iter(vec![
3923 None,
3924 Some(valid),
3925 Some(invalid),
3926 None,
3927 Some(&valid_long),
3928 Some(valid),
3929 ])
3930 }
3931
3932 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3937 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3938 let mut data = vec![];
3939 let schema = batch.schema();
3940 let props = encoding.map(|encoding| {
3941 WriterProperties::builder()
3942 .set_dictionary_enabled(false)
3944 .set_encoding(encoding)
3945 .build()
3946 });
3947
3948 {
3949 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3950 writer.write(&batch).unwrap();
3951 writer.flush().unwrap();
3952 writer.close().unwrap();
3953 };
3954 data
3955 }
3956
3957 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3959 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3960 .unwrap()
3961 .build()
3962 .unwrap();
3963
3964 reader.collect()
3965 }
3966
3967 #[test]
3968 fn test_dictionary_preservation() {
3969 let fields = vec![Arc::new(
3970 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3971 .with_repetition(Repetition::OPTIONAL)
3972 .with_converted_type(ConvertedType::UTF8)
3973 .build()
3974 .unwrap(),
3975 )];
3976
3977 let schema = Arc::new(
3978 Type::group_type_builder("test_schema")
3979 .with_fields(fields)
3980 .build()
3981 .unwrap(),
3982 );
3983
3984 let dict_type = ArrowDataType::Dictionary(
3985 Box::new(ArrowDataType::Int32),
3986 Box::new(ArrowDataType::Utf8),
3987 );
3988
3989 let arrow_field = Field::new("leaf", dict_type, true);
3990
3991 let mut file = tempfile::tempfile().unwrap();
3992
3993 let values = vec![
3994 vec![
3995 ByteArray::from("hello"),
3996 ByteArray::from("a"),
3997 ByteArray::from("b"),
3998 ByteArray::from("d"),
3999 ],
4000 vec![
4001 ByteArray::from("c"),
4002 ByteArray::from("a"),
4003 ByteArray::from("b"),
4004 ],
4005 ];
4006
4007 let def_levels = vec![
4008 vec![1, 0, 0, 1, 0, 0, 1, 1],
4009 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
4010 ];
4011
4012 let opts = TestOptions {
4013 encoding: Encoding::RLE_DICTIONARY,
4014 ..Default::default()
4015 };
4016
4017 generate_single_column_file_with_data::<ByteArrayType>(
4018 &values,
4019 Some(&def_levels),
4020 file.try_clone().unwrap(), schema,
4022 Some(arrow_field),
4023 &opts,
4024 )
4025 .unwrap();
4026
4027 file.rewind().unwrap();
4028
4029 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
4030
4031 let batches = record_reader
4032 .collect::<Result<Vec<RecordBatch>, _>>()
4033 .unwrap();
4034
4035 assert_eq!(batches.len(), 6);
4036 assert!(batches.iter().all(|x| x.num_columns() == 1));
4037
4038 let row_counts = batches
4039 .iter()
4040 .map(|x| (x.num_rows(), x.column(0).null_count()))
4041 .collect::<Vec<_>>();
4042
4043 assert_eq!(
4044 row_counts,
4045 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
4046 );
4047
4048 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
4049
4050 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
4052 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
4054 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
4055 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
4057 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
4058 }
4059
4060 #[test]
4061 fn test_read_null_list() {
4062 let testdata = arrow::util::test_util::parquet_test_data();
4063 let path = format!("{testdata}/null_list.parquet");
4064 let file = File::open(path).unwrap();
4065 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
4066
4067 let batch = record_batch_reader.next().unwrap().unwrap();
4068 assert_eq!(batch.num_rows(), 1);
4069 assert_eq!(batch.num_columns(), 1);
4070 assert_eq!(batch.column(0).len(), 1);
4071
4072 let list = batch
4073 .column(0)
4074 .as_any()
4075 .downcast_ref::<ListArray>()
4076 .unwrap();
4077 assert_eq!(list.len(), 1);
4078 assert!(list.is_valid(0));
4079
4080 let val = list.value(0);
4081 assert_eq!(val.len(), 0);
4082 }
4083
4084 #[test]
4085 fn test_null_schema_inference() {
4086 let testdata = arrow::util::test_util::parquet_test_data();
4087 let path = format!("{testdata}/null_list.parquet");
4088 let file = File::open(path).unwrap();
4089
4090 let arrow_field = Field::new(
4091 "emptylist",
4092 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
4093 true,
4094 );
4095
4096 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
4097 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
4098 let schema = builder.schema();
4099 assert_eq!(schema.fields().len(), 1);
4100 assert_eq!(schema.field(0), &arrow_field);
4101 }
4102
4103 #[test]
4104 fn test_skip_metadata() {
4105 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
4106 let field = Field::new("col", col.data_type().clone(), true);
4107
4108 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
4109
4110 let metadata = [("key".to_string(), "value".to_string())]
4111 .into_iter()
4112 .collect();
4113
4114 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
4115
4116 assert_ne!(schema_with_metadata, schema_without_metadata);
4117
4118 let batch =
4119 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
4120
4121 let file = |version: WriterVersion| {
4122 let props = WriterProperties::builder()
4123 .set_writer_version(version)
4124 .build();
4125
4126 let file = tempfile().unwrap();
4127 let mut writer =
4128 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
4129 .unwrap();
4130 writer.write(&batch).unwrap();
4131 writer.close().unwrap();
4132 file
4133 };
4134
4135 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
4136
4137 let v1_reader = file(WriterVersion::PARQUET_1_0);
4138 let v2_reader = file(WriterVersion::PARQUET_2_0);
4139
4140 let arrow_reader =
4141 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
4142 assert_eq!(arrow_reader.schema(), schema_with_metadata);
4143
4144 let reader =
4145 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
4146 .unwrap()
4147 .build()
4148 .unwrap();
4149 assert_eq!(reader.schema(), schema_without_metadata);
4150
4151 let arrow_reader =
4152 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
4153 assert_eq!(arrow_reader.schema(), schema_with_metadata);
4154
4155 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
4156 .unwrap()
4157 .build()
4158 .unwrap();
4159 assert_eq!(reader.schema(), schema_without_metadata);
4160 }
4161
4162 fn write_parquet_from_iter<I, F>(value: I) -> File
4163 where
4164 I: IntoIterator<Item = (F, ArrayRef)>,
4165 F: AsRef<str>,
4166 {
4167 let batch = RecordBatch::try_from_iter(value).unwrap();
4168 let file = tempfile().unwrap();
4169 let mut writer =
4170 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
4171 writer.write(&batch).unwrap();
4172 writer.close().unwrap();
4173 file
4174 }
4175
4176 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
4177 where
4178 I: IntoIterator<Item = (F, ArrayRef)>,
4179 F: AsRef<str>,
4180 {
4181 let file = write_parquet_from_iter(value);
4182 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
4183 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4184 file.try_clone().unwrap(),
4185 options_with_schema,
4186 );
4187 assert_eq!(builder.err().unwrap().to_string(), expected_error);
4188 }
4189
4190 #[test]
4191 fn test_schema_too_few_columns() {
4192 run_schema_test_with_error(
4193 vec![
4194 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4195 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4196 ],
4197 Arc::new(Schema::new(vec![Field::new(
4198 "int64",
4199 ArrowDataType::Int64,
4200 false,
4201 )])),
4202 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
4203 );
4204 }
4205
4206 #[test]
4207 fn test_schema_too_many_columns() {
4208 run_schema_test_with_error(
4209 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4210 Arc::new(Schema::new(vec![
4211 Field::new("int64", ArrowDataType::Int64, false),
4212 Field::new("int32", ArrowDataType::Int32, false),
4213 ])),
4214 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
4215 );
4216 }
4217
4218 #[test]
4219 fn test_schema_mismatched_column_names() {
4220 run_schema_test_with_error(
4221 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
4222 Arc::new(Schema::new(vec![Field::new(
4223 "other",
4224 ArrowDataType::Int64,
4225 false,
4226 )])),
4227 "Arrow: incompatible arrow schema, expected field named int64 got other",
4228 );
4229 }
4230
4231 #[test]
4232 fn test_schema_incompatible_columns() {
4233 run_schema_test_with_error(
4234 vec![
4235 (
4236 "col1_invalid",
4237 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4238 ),
4239 (
4240 "col2_valid",
4241 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
4242 ),
4243 (
4244 "col3_invalid",
4245 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
4246 ),
4247 ],
4248 Arc::new(Schema::new(vec![
4249 Field::new("col1_invalid", ArrowDataType::Int32, false),
4250 Field::new("col2_valid", ArrowDataType::Int32, false),
4251 Field::new("col3_invalid", ArrowDataType::Int32, false),
4252 ])),
4253 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
4254 );
4255 }
4256
4257 #[test]
4258 fn test_one_incompatible_nested_column() {
4259 let nested_fields = Fields::from(vec![
4260 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4261 Field::new("nested1_invalid", ArrowDataType::Int64, false),
4262 ]);
4263 let nested = StructArray::try_new(
4264 nested_fields,
4265 vec![
4266 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
4267 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
4268 ],
4269 None,
4270 )
4271 .expect("struct array");
4272 let supplied_nested_fields = Fields::from(vec![
4273 Field::new("nested1_valid", ArrowDataType::Utf8, false),
4274 Field::new("nested1_invalid", ArrowDataType::Int32, false),
4275 ]);
4276 run_schema_test_with_error(
4277 vec![
4278 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
4279 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
4280 ("nested", Arc::new(nested) as ArrayRef),
4281 ],
4282 Arc::new(Schema::new(vec![
4283 Field::new("col1", ArrowDataType::Int64, false),
4284 Field::new("col2", ArrowDataType::Int32, false),
4285 Field::new(
4286 "nested",
4287 ArrowDataType::Struct(supplied_nested_fields),
4288 false,
4289 ),
4290 ])),
4291 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4292 requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4293 but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4294 );
4295 }
4296
4297 fn utf8_parquet() -> Bytes {
4299 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4300 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4301 let props = None;
4302 let mut parquet_data = vec![];
4304 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4305 writer.write(&batch).unwrap();
4306 writer.close().unwrap();
4307 Bytes::from(parquet_data)
4308 }
4309
4310 #[test]
4311 fn test_schema_error_bad_types() {
4312 let parquet_data = utf8_parquet();
4314
4315 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4317 "column1",
4318 arrow::datatypes::DataType::Int32,
4319 false,
4320 )]));
4321
4322 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4324 let err =
4325 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4326 .unwrap_err();
4327 assert_eq!(
4328 err.to_string(),
4329 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4330 )
4331 }
4332
4333 #[test]
4334 fn test_schema_error_bad_nullability() {
4335 let parquet_data = utf8_parquet();
4337
4338 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4340 "column1",
4341 arrow::datatypes::DataType::Utf8,
4342 true,
4343 )]));
4344
4345 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4347 let err =
4348 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4349 .unwrap_err();
4350 assert_eq!(
4351 err.to_string(),
4352 "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4353 )
4354 }
4355
4356 #[test]
4357 fn test_read_binary_as_utf8() {
4358 let file = write_parquet_from_iter(vec![
4359 (
4360 "binary_to_utf8",
4361 Arc::new(BinaryArray::from(vec![
4362 b"one".as_ref(),
4363 b"two".as_ref(),
4364 b"three".as_ref(),
4365 ])) as ArrayRef,
4366 ),
4367 (
4368 "large_binary_to_large_utf8",
4369 Arc::new(LargeBinaryArray::from(vec![
4370 b"one".as_ref(),
4371 b"two".as_ref(),
4372 b"three".as_ref(),
4373 ])) as ArrayRef,
4374 ),
4375 (
4376 "binary_view_to_utf8_view",
4377 Arc::new(BinaryViewArray::from(vec![
4378 b"one".as_ref(),
4379 b"two".as_ref(),
4380 b"three".as_ref(),
4381 ])) as ArrayRef,
4382 ),
4383 ]);
4384 let supplied_fields = Fields::from(vec![
4385 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4386 Field::new(
4387 "large_binary_to_large_utf8",
4388 ArrowDataType::LargeUtf8,
4389 false,
4390 ),
4391 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4392 ]);
4393
4394 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4395 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4396 file.try_clone().unwrap(),
4397 options,
4398 )
4399 .expect("reader builder with schema")
4400 .build()
4401 .expect("reader with schema");
4402
4403 let batch = arrow_reader.next().unwrap().unwrap();
4404 assert_eq!(batch.num_columns(), 3);
4405 assert_eq!(batch.num_rows(), 3);
4406 assert_eq!(
4407 batch
4408 .column(0)
4409 .as_string::<i32>()
4410 .iter()
4411 .collect::<Vec<_>>(),
4412 vec![Some("one"), Some("two"), Some("three")]
4413 );
4414
4415 assert_eq!(
4416 batch
4417 .column(1)
4418 .as_string::<i64>()
4419 .iter()
4420 .collect::<Vec<_>>(),
4421 vec![Some("one"), Some("two"), Some("three")]
4422 );
4423
4424 assert_eq!(
4425 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4426 vec![Some("one"), Some("two"), Some("three")]
4427 );
4428 }
4429
4430 #[test]
4431 #[should_panic(expected = "Invalid UTF8 sequence at")]
4432 fn test_read_non_utf8_binary_as_utf8() {
4433 let file = write_parquet_from_iter(vec![(
4434 "non_utf8_binary",
4435 Arc::new(BinaryArray::from(vec![
4436 b"\xDE\x00\xFF".as_ref(),
4437 b"\xDE\x01\xAA".as_ref(),
4438 b"\xDE\x02\xFF".as_ref(),
4439 ])) as ArrayRef,
4440 )]);
4441 let supplied_fields = Fields::from(vec![Field::new(
4442 "non_utf8_binary",
4443 ArrowDataType::Utf8,
4444 false,
4445 )]);
4446
4447 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4448 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4449 file.try_clone().unwrap(),
4450 options,
4451 )
4452 .expect("reader builder with schema")
4453 .build()
4454 .expect("reader with schema");
4455 arrow_reader.next().unwrap().unwrap_err();
4456 }
4457
4458 #[test]
4459 fn test_with_schema() {
4460 let nested_fields = Fields::from(vec![
4461 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4462 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4463 ]);
4464
4465 let nested_arrays: Vec<ArrayRef> = vec![
4466 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4467 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4468 ];
4469
4470 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4471
4472 let file = write_parquet_from_iter(vec![
4473 (
4474 "int32_to_ts_second",
4475 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4476 ),
4477 (
4478 "date32_to_date64",
4479 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4480 ),
4481 ("nested", Arc::new(nested) as ArrayRef),
4482 ]);
4483
4484 let supplied_nested_fields = Fields::from(vec![
4485 Field::new(
4486 "utf8_to_dict",
4487 ArrowDataType::Dictionary(
4488 Box::new(ArrowDataType::Int32),
4489 Box::new(ArrowDataType::Utf8),
4490 ),
4491 false,
4492 ),
4493 Field::new(
4494 "int64_to_ts_nano",
4495 ArrowDataType::Timestamp(
4496 arrow::datatypes::TimeUnit::Nanosecond,
4497 Some("+10:00".into()),
4498 ),
4499 false,
4500 ),
4501 ]);
4502
4503 let supplied_schema = Arc::new(Schema::new(vec![
4504 Field::new(
4505 "int32_to_ts_second",
4506 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4507 false,
4508 ),
4509 Field::new("date32_to_date64", ArrowDataType::Date64, false),
4510 Field::new(
4511 "nested",
4512 ArrowDataType::Struct(supplied_nested_fields),
4513 false,
4514 ),
4515 ]));
4516
4517 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4518 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4519 file.try_clone().unwrap(),
4520 options,
4521 )
4522 .expect("reader builder with schema")
4523 .build()
4524 .expect("reader with schema");
4525
4526 assert_eq!(arrow_reader.schema(), supplied_schema);
4527 let batch = arrow_reader.next().unwrap().unwrap();
4528 assert_eq!(batch.num_columns(), 3);
4529 assert_eq!(batch.num_rows(), 4);
4530 assert_eq!(
4531 batch
4532 .column(0)
4533 .as_any()
4534 .downcast_ref::<TimestampSecondArray>()
4535 .expect("downcast to timestamp second")
4536 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4537 .map(|v| v.to_string())
4538 .expect("value as datetime"),
4539 "1970-01-01 01:00:00 +01:00"
4540 );
4541 assert_eq!(
4542 batch
4543 .column(1)
4544 .as_any()
4545 .downcast_ref::<Date64Array>()
4546 .expect("downcast to date64")
4547 .value_as_date(0)
4548 .map(|v| v.to_string())
4549 .expect("value as date"),
4550 "1970-01-01"
4551 );
4552
4553 let nested = batch
4554 .column(2)
4555 .as_any()
4556 .downcast_ref::<StructArray>()
4557 .expect("downcast to struct");
4558
4559 let nested_dict = nested
4560 .column(0)
4561 .as_any()
4562 .downcast_ref::<Int32DictionaryArray>()
4563 .expect("downcast to dictionary");
4564
4565 assert_eq!(
4566 nested_dict
4567 .values()
4568 .as_any()
4569 .downcast_ref::<StringArray>()
4570 .expect("downcast to string")
4571 .iter()
4572 .collect::<Vec<_>>(),
4573 vec![Some("a"), Some("b")]
4574 );
4575
4576 assert_eq!(
4577 nested_dict.keys().iter().collect::<Vec<_>>(),
4578 vec![Some(0), Some(0), Some(0), Some(1)]
4579 );
4580
4581 assert_eq!(
4582 nested
4583 .column(1)
4584 .as_any()
4585 .downcast_ref::<TimestampNanosecondArray>()
4586 .expect("downcast to timestamp nanosecond")
4587 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4588 .map(|v| v.to_string())
4589 .expect("value as datetime"),
4590 "1970-01-01 10:00:00.000000001 +10:00"
4591 );
4592 }
4593
4594 #[test]
4595 fn test_empty_projection() {
4596 let testdata = arrow::util::test_util::parquet_test_data();
4597 let path = format!("{testdata}/alltypes_plain.parquet");
4598 let file = File::open(path).unwrap();
4599
4600 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4601 let file_metadata = builder.metadata().file_metadata();
4602 let expected_rows = file_metadata.num_rows() as usize;
4603
4604 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4605 let batch_reader = builder
4606 .with_projection(mask)
4607 .with_batch_size(2)
4608 .build()
4609 .unwrap();
4610
4611 let mut total_rows = 0;
4612 for maybe_batch in batch_reader {
4613 let batch = maybe_batch.unwrap();
4614 total_rows += batch.num_rows();
4615 assert_eq!(batch.num_columns(), 0);
4616 assert!(batch.num_rows() <= 2);
4617 }
4618
4619 assert_eq!(total_rows, expected_rows);
4620 }
4621
4622 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4623 let schema = Arc::new(Schema::new(vec![Field::new(
4624 "list",
4625 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4626 true,
4627 )]));
4628
4629 let mut buf = Vec::with_capacity(1024);
4630
4631 let mut writer = ArrowWriter::try_new(
4632 &mut buf,
4633 schema.clone(),
4634 Some(
4635 WriterProperties::builder()
4636 .set_max_row_group_size(row_group_size)
4637 .build(),
4638 ),
4639 )
4640 .unwrap();
4641 for _ in 0..2 {
4642 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4643 for _ in 0..(batch_size) {
4644 list_builder.append(true);
4645 }
4646 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4647 .unwrap();
4648 writer.write(&batch).unwrap();
4649 }
4650 writer.close().unwrap();
4651
4652 let mut record_reader =
4653 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4654 assert_eq!(
4655 batch_size,
4656 record_reader.next().unwrap().unwrap().num_rows()
4657 );
4658 assert_eq!(
4659 batch_size,
4660 record_reader.next().unwrap().unwrap().num_rows()
4661 );
4662 }
4663
4664 #[test]
4665 fn test_row_group_exact_multiple() {
4666 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4667 test_row_group_batch(8, 8);
4668 test_row_group_batch(10, 8);
4669 test_row_group_batch(8, 10);
4670 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4671 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4672 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4673 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4674 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4675 }
4676
4677 fn get_expected_batches(
4680 column: &RecordBatch,
4681 selection: &RowSelection,
4682 batch_size: usize,
4683 ) -> Vec<RecordBatch> {
4684 let mut expected_batches = vec![];
4685
4686 let mut selection: VecDeque<_> = selection.clone().into();
4687 let mut row_offset = 0;
4688 let mut last_start = None;
4689 while row_offset < column.num_rows() && !selection.is_empty() {
4690 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4691 while batch_remaining > 0 && !selection.is_empty() {
4692 let (to_read, skip) = match selection.front_mut() {
4693 Some(selection) if selection.row_count > batch_remaining => {
4694 selection.row_count -= batch_remaining;
4695 (batch_remaining, selection.skip)
4696 }
4697 Some(_) => {
4698 let select = selection.pop_front().unwrap();
4699 (select.row_count, select.skip)
4700 }
4701 None => break,
4702 };
4703
4704 batch_remaining -= to_read;
4705
4706 match skip {
4707 true => {
4708 if let Some(last_start) = last_start.take() {
4709 expected_batches.push(column.slice(last_start, row_offset - last_start))
4710 }
4711 row_offset += to_read
4712 }
4713 false => {
4714 last_start.get_or_insert(row_offset);
4715 row_offset += to_read
4716 }
4717 }
4718 }
4719 }
4720
4721 if let Some(last_start) = last_start.take() {
4722 expected_batches.push(column.slice(last_start, row_offset - last_start))
4723 }
4724
4725 for batch in &expected_batches[..expected_batches.len() - 1] {
4727 assert_eq!(batch.num_rows(), batch_size);
4728 }
4729
4730 expected_batches
4731 }
4732
4733 fn create_test_selection(
4734 step_len: usize,
4735 total_len: usize,
4736 skip_first: bool,
4737 ) -> (RowSelection, usize) {
4738 let mut remaining = total_len;
4739 let mut skip = skip_first;
4740 let mut vec = vec![];
4741 let mut selected_count = 0;
4742 while remaining != 0 {
4743 let step = if remaining > step_len {
4744 step_len
4745 } else {
4746 remaining
4747 };
4748 vec.push(RowSelector {
4749 row_count: step,
4750 skip,
4751 });
4752 remaining -= step;
4753 if !skip {
4754 selected_count += step;
4755 }
4756 skip = !skip;
4757 }
4758 (vec.into(), selected_count)
4759 }
4760
4761 #[test]
4762 fn test_scan_row_with_selection() {
4763 let testdata = arrow::util::test_util::parquet_test_data();
4764 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4765 let test_file = File::open(&path).unwrap();
4766
4767 let mut serial_reader =
4768 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4769 let data = serial_reader.next().unwrap().unwrap();
4770
4771 let do_test = |batch_size: usize, selection_len: usize| {
4772 for skip_first in [false, true] {
4773 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4774
4775 let expected = get_expected_batches(&data, &selections, batch_size);
4776 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4777 assert_eq!(
4778 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4779 expected,
4780 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4781 );
4782 }
4783 };
4784
4785 do_test(1000, 1000);
4788
4789 do_test(20, 20);
4791
4792 do_test(20, 5);
4794
4795 do_test(20, 5);
4798
4799 fn create_skip_reader(
4800 test_file: &File,
4801 batch_size: usize,
4802 selections: RowSelection,
4803 ) -> ParquetRecordBatchReader {
4804 let options =
4805 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required);
4806 let file = test_file.try_clone().unwrap();
4807 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4808 .unwrap()
4809 .with_batch_size(batch_size)
4810 .with_row_selection(selections)
4811 .build()
4812 .unwrap()
4813 }
4814 }
4815
4816 #[test]
4817 fn test_batch_size_overallocate() {
4818 let testdata = arrow::util::test_util::parquet_test_data();
4819 let path = format!("{testdata}/alltypes_plain.parquet");
4821 let test_file = File::open(path).unwrap();
4822
4823 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4824 let num_rows = builder.metadata.file_metadata().num_rows();
4825 let reader = builder
4826 .with_batch_size(1024)
4827 .with_projection(ProjectionMask::all())
4828 .build()
4829 .unwrap();
4830 assert_ne!(1024, num_rows);
4831 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4832 }
4833
4834 #[test]
4835 fn test_read_with_page_index_enabled() {
4836 let testdata = arrow::util::test_util::parquet_test_data();
4837
4838 {
4839 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4841 let test_file = File::open(path).unwrap();
4842 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4843 test_file,
4844 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4845 )
4846 .unwrap();
4847 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4848 let reader = builder.build().unwrap();
4849 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4850 assert_eq!(batches.len(), 8);
4851 }
4852
4853 {
4854 let path = format!("{testdata}/alltypes_plain.parquet");
4856 let test_file = File::open(path).unwrap();
4857 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4858 test_file,
4859 ArrowReaderOptions::new().with_page_index_policy(PageIndexPolicy::Required),
4860 )
4861 .unwrap();
4862 assert!(builder.metadata().offset_index().is_none());
4865 let reader = builder.build().unwrap();
4866 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4867 assert_eq!(batches.len(), 1);
4868 }
4869 }
4870
4871 #[test]
4872 fn test_raw_repetition() {
4873 const MESSAGE_TYPE: &str = "
4874 message Log {
4875 OPTIONAL INT32 eventType;
4876 REPEATED INT32 category;
4877 REPEATED group filter {
4878 OPTIONAL INT32 error;
4879 }
4880 }
4881 ";
4882 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4883 let props = Default::default();
4884
4885 let mut buf = Vec::with_capacity(1024);
4886 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4887 let mut row_group_writer = writer.next_row_group().unwrap();
4888
4889 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4891 col_writer
4892 .typed::<Int32Type>()
4893 .write_batch(&[1], Some(&[1]), None)
4894 .unwrap();
4895 col_writer.close().unwrap();
4896 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4898 col_writer
4899 .typed::<Int32Type>()
4900 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4901 .unwrap();
4902 col_writer.close().unwrap();
4903 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4905 col_writer
4906 .typed::<Int32Type>()
4907 .write_batch(&[1], Some(&[1]), Some(&[0]))
4908 .unwrap();
4909 col_writer.close().unwrap();
4910
4911 let rg_md = row_group_writer.close().unwrap();
4912 assert_eq!(rg_md.num_rows(), 1);
4913 writer.close().unwrap();
4914
4915 let bytes = Bytes::from(buf);
4916
4917 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4918 let full = no_mask.next().unwrap().unwrap();
4919
4920 assert_eq!(full.num_columns(), 3);
4921
4922 for idx in 0..3 {
4923 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4924 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4925 let mut reader = b.with_projection(mask).build().unwrap();
4926 let projected = reader.next().unwrap().unwrap();
4927
4928 assert_eq!(projected.num_columns(), 1);
4929 assert_eq!(full.column(idx), projected.column(0));
4930 }
4931 }
4932
4933 #[test]
4934 fn test_read_lz4_raw() {
4935 let testdata = arrow::util::test_util::parquet_test_data();
4936 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4937 let file = File::open(path).unwrap();
4938
4939 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4940 .unwrap()
4941 .collect::<Result<Vec<_>, _>>()
4942 .unwrap();
4943 assert_eq!(batches.len(), 1);
4944 let batch = &batches[0];
4945
4946 assert_eq!(batch.num_columns(), 3);
4947 assert_eq!(batch.num_rows(), 4);
4948
4949 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4951 assert_eq!(
4952 a.values(),
4953 &[1593604800, 1593604800, 1593604801, 1593604801]
4954 );
4955
4956 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4957 let a: Vec<_> = a.iter().flatten().collect();
4958 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4959
4960 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4961 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4962 }
4963
4964 #[test]
4974 fn test_read_lz4_hadoop_fallback() {
4975 for file in [
4976 "hadoop_lz4_compressed.parquet",
4977 "non_hadoop_lz4_compressed.parquet",
4978 ] {
4979 let testdata = arrow::util::test_util::parquet_test_data();
4980 let path = format!("{testdata}/{file}");
4981 let file = File::open(path).unwrap();
4982 let expected_rows = 4;
4983
4984 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4985 .unwrap()
4986 .collect::<Result<Vec<_>, _>>()
4987 .unwrap();
4988 assert_eq!(batches.len(), 1);
4989 let batch = &batches[0];
4990
4991 assert_eq!(batch.num_columns(), 3);
4992 assert_eq!(batch.num_rows(), expected_rows);
4993
4994 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4995 assert_eq!(
4996 a.values(),
4997 &[1593604800, 1593604800, 1593604801, 1593604801]
4998 );
4999
5000 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
5001 let b: Vec<_> = b.iter().flatten().collect();
5002 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
5003
5004 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
5005 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
5006 }
5007 }
5008
5009 #[test]
5010 fn test_read_lz4_hadoop_large() {
5011 let testdata = arrow::util::test_util::parquet_test_data();
5012 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
5013 let file = File::open(path).unwrap();
5014 let expected_rows = 10000;
5015
5016 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
5017 .unwrap()
5018 .collect::<Result<Vec<_>, _>>()
5019 .unwrap();
5020 assert_eq!(batches.len(), 1);
5021 let batch = &batches[0];
5022
5023 assert_eq!(batch.num_columns(), 1);
5024 assert_eq!(batch.num_rows(), expected_rows);
5025
5026 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
5027 let a: Vec<_> = a.iter().flatten().collect();
5028 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
5029 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
5030 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
5031 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
5032 }
5033
5034 #[test]
5035 #[cfg(feature = "snap")]
5036 fn test_read_nested_lists() {
5037 let testdata = arrow::util::test_util::parquet_test_data();
5038 let path = format!("{testdata}/nested_lists.snappy.parquet");
5039 let file = File::open(path).unwrap();
5040
5041 let f = file.try_clone().unwrap();
5042 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
5043 let expected = reader.next().unwrap().unwrap();
5044 assert_eq!(expected.num_rows(), 3);
5045
5046 let selection = RowSelection::from(vec![
5047 RowSelector::skip(1),
5048 RowSelector::select(1),
5049 RowSelector::skip(1),
5050 ]);
5051 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5052 .unwrap()
5053 .with_row_selection(selection)
5054 .build()
5055 .unwrap();
5056
5057 let actual = reader.next().unwrap().unwrap();
5058 assert_eq!(actual.num_rows(), 1);
5059 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
5060 }
5061
5062 #[test]
5063 fn test_arbitrary_decimal() {
5064 let values = [1, 2, 3, 4, 5, 6, 7, 8];
5065 let decimals_19_0 = Decimal128Array::from_iter_values(values)
5066 .with_precision_and_scale(19, 0)
5067 .unwrap();
5068 let decimals_12_0 = Decimal128Array::from_iter_values(values)
5069 .with_precision_and_scale(12, 0)
5070 .unwrap();
5071 let decimals_17_10 = Decimal128Array::from_iter_values(values)
5072 .with_precision_and_scale(17, 10)
5073 .unwrap();
5074
5075 let written = RecordBatch::try_from_iter([
5076 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
5077 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
5078 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
5079 ])
5080 .unwrap();
5081
5082 let mut buffer = Vec::with_capacity(1024);
5083 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
5084 writer.write(&written).unwrap();
5085 writer.close().unwrap();
5086
5087 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
5088 .unwrap()
5089 .collect::<Result<Vec<_>, _>>()
5090 .unwrap();
5091
5092 assert_eq!(&written.slice(0, 8), &read[0]);
5093 }
5094
5095 #[test]
5096 fn test_list_skip() {
5097 let mut list = ListBuilder::new(Int32Builder::new());
5098 list.append_value([Some(1), Some(2)]);
5099 list.append_value([Some(3)]);
5100 list.append_value([Some(4)]);
5101 let list = list.finish();
5102 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
5103
5104 let props = WriterProperties::builder()
5106 .set_data_page_row_count_limit(1)
5107 .set_write_batch_size(2)
5108 .build();
5109
5110 let mut buffer = Vec::with_capacity(1024);
5111 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
5112 writer.write(&batch).unwrap();
5113 writer.close().unwrap();
5114
5115 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
5116 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
5117 .unwrap()
5118 .with_row_selection(selection.into())
5119 .build()
5120 .unwrap();
5121 let out = reader.next().unwrap().unwrap();
5122 assert_eq!(out.num_rows(), 1);
5123 assert_eq!(out, batch.slice(2, 1));
5124 }
5125
5126 fn test_decimal32_roundtrip() {
5127 let d = |values: Vec<i32>, p: u8| {
5128 let iter = values.into_iter();
5129 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
5130 .with_precision_and_scale(p, 2)
5131 .unwrap()
5132 };
5133
5134 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5135 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
5136
5137 let mut buffer = Vec::with_capacity(1024);
5138 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5139 writer.write(&batch).unwrap();
5140 writer.close().unwrap();
5141
5142 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5143 let t1 = builder.parquet_schema().columns()[0].physical_type();
5144 assert_eq!(t1, PhysicalType::INT32);
5145
5146 let mut reader = builder.build().unwrap();
5147 assert_eq!(batch.schema(), reader.schema());
5148
5149 let out = reader.next().unwrap().unwrap();
5150 assert_eq!(batch, out);
5151 }
5152
5153 fn test_decimal64_roundtrip() {
5154 let d = |values: Vec<i64>, p: u8| {
5158 let iter = values.into_iter();
5159 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
5160 .with_precision_and_scale(p, 2)
5161 .unwrap()
5162 };
5163
5164 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5165 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5166 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5167
5168 let batch = RecordBatch::try_from_iter([
5169 ("d1", Arc::new(d1) as ArrayRef),
5170 ("d2", Arc::new(d2) as ArrayRef),
5171 ("d3", Arc::new(d3) as ArrayRef),
5172 ])
5173 .unwrap();
5174
5175 let mut buffer = Vec::with_capacity(1024);
5176 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5177 writer.write(&batch).unwrap();
5178 writer.close().unwrap();
5179
5180 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5181 let t1 = builder.parquet_schema().columns()[0].physical_type();
5182 assert_eq!(t1, PhysicalType::INT32);
5183 let t2 = builder.parquet_schema().columns()[1].physical_type();
5184 assert_eq!(t2, PhysicalType::INT64);
5185 let t3 = builder.parquet_schema().columns()[2].physical_type();
5186 assert_eq!(t3, PhysicalType::INT64);
5187
5188 let mut reader = builder.build().unwrap();
5189 assert_eq!(batch.schema(), reader.schema());
5190
5191 let out = reader.next().unwrap().unwrap();
5192 assert_eq!(batch, out);
5193 }
5194
5195 fn test_decimal_roundtrip<T: DecimalType>() {
5196 let d = |values: Vec<usize>, p: u8| {
5201 let iter = values.into_iter().map(T::Native::usize_as);
5202 PrimitiveArray::<T>::from_iter_values(iter)
5203 .with_precision_and_scale(p, 2)
5204 .unwrap()
5205 };
5206
5207 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5208 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5209 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5210 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5211
5212 let batch = RecordBatch::try_from_iter([
5213 ("d1", Arc::new(d1) as ArrayRef),
5214 ("d2", Arc::new(d2) as ArrayRef),
5215 ("d3", Arc::new(d3) as ArrayRef),
5216 ("d4", Arc::new(d4) as ArrayRef),
5217 ])
5218 .unwrap();
5219
5220 let mut buffer = Vec::with_capacity(1024);
5221 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5222 writer.write(&batch).unwrap();
5223 writer.close().unwrap();
5224
5225 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5226 let t1 = builder.parquet_schema().columns()[0].physical_type();
5227 assert_eq!(t1, PhysicalType::INT32);
5228 let t2 = builder.parquet_schema().columns()[1].physical_type();
5229 assert_eq!(t2, PhysicalType::INT64);
5230 let t3 = builder.parquet_schema().columns()[2].physical_type();
5231 assert_eq!(t3, PhysicalType::INT64);
5232 let t4 = builder.parquet_schema().columns()[3].physical_type();
5233 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5234
5235 let mut reader = builder.build().unwrap();
5236 assert_eq!(batch.schema(), reader.schema());
5237
5238 let out = reader.next().unwrap().unwrap();
5239 assert_eq!(batch, out);
5240 }
5241
5242 #[test]
5243 fn test_decimal() {
5244 test_decimal32_roundtrip();
5245 test_decimal64_roundtrip();
5246 test_decimal_roundtrip::<Decimal128Type>();
5247 test_decimal_roundtrip::<Decimal256Type>();
5248 }
5249
5250 #[test]
5251 fn test_list_selection() {
5252 let schema = Arc::new(Schema::new(vec![Field::new_list(
5253 "list",
5254 Field::new_list_field(ArrowDataType::Utf8, true),
5255 false,
5256 )]));
5257 let mut buf = Vec::with_capacity(1024);
5258
5259 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5260
5261 for i in 0..2 {
5262 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5263 for j in 0..1024 {
5264 list_a_builder.values().append_value(format!("{i} {j}"));
5265 list_a_builder.append(true);
5266 }
5267 let batch =
5268 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5269 .unwrap();
5270 writer.write(&batch).unwrap();
5271 }
5272 let _metadata = writer.close().unwrap();
5273
5274 let buf = Bytes::from(buf);
5275 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5276 .unwrap()
5277 .with_row_selection(RowSelection::from(vec![
5278 RowSelector::skip(100),
5279 RowSelector::select(924),
5280 RowSelector::skip(100),
5281 RowSelector::select(924),
5282 ]))
5283 .build()
5284 .unwrap();
5285
5286 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5287 let batch = concat_batches(&schema, &batches).unwrap();
5288
5289 assert_eq!(batch.num_rows(), 924 * 2);
5290 let list = batch.column(0).as_list::<i32>();
5291
5292 for w in list.value_offsets().windows(2) {
5293 assert_eq!(w[0] + 1, w[1])
5294 }
5295 let mut values = list.values().as_string::<i32>().iter();
5296
5297 for i in 0..2 {
5298 for j in 100..1024 {
5299 let expected = format!("{i} {j}");
5300 assert_eq!(values.next().unwrap().unwrap(), &expected);
5301 }
5302 }
5303 }
5304
5305 #[test]
5306 fn test_list_selection_fuzz() {
5307 let mut rng = rng();
5308 let schema = Arc::new(Schema::new(vec![Field::new_list(
5309 "list",
5310 Field::new_list(
5311 Field::LIST_FIELD_DEFAULT_NAME,
5312 Field::new_list_field(ArrowDataType::Int32, true),
5313 true,
5314 ),
5315 true,
5316 )]));
5317 let mut buf = Vec::with_capacity(1024);
5318 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5319
5320 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5321
5322 for _ in 0..2048 {
5323 if rng.random_bool(0.2) {
5324 list_a_builder.append(false);
5325 continue;
5326 }
5327
5328 let list_a_len = rng.random_range(0..10);
5329 let list_b_builder = list_a_builder.values();
5330
5331 for _ in 0..list_a_len {
5332 if rng.random_bool(0.2) {
5333 list_b_builder.append(false);
5334 continue;
5335 }
5336
5337 let list_b_len = rng.random_range(0..10);
5338 let int_builder = list_b_builder.values();
5339 for _ in 0..list_b_len {
5340 match rng.random_bool(0.2) {
5341 true => int_builder.append_null(),
5342 false => int_builder.append_value(rng.random()),
5343 }
5344 }
5345 list_b_builder.append(true)
5346 }
5347 list_a_builder.append(true);
5348 }
5349
5350 let array = Arc::new(list_a_builder.finish());
5351 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5352
5353 writer.write(&batch).unwrap();
5354 let _metadata = writer.close().unwrap();
5355
5356 let buf = Bytes::from(buf);
5357
5358 let cases = [
5359 vec![
5360 RowSelector::skip(100),
5361 RowSelector::select(924),
5362 RowSelector::skip(100),
5363 RowSelector::select(924),
5364 ],
5365 vec![
5366 RowSelector::select(924),
5367 RowSelector::skip(100),
5368 RowSelector::select(924),
5369 RowSelector::skip(100),
5370 ],
5371 vec![
5372 RowSelector::skip(1023),
5373 RowSelector::select(1),
5374 RowSelector::skip(1023),
5375 RowSelector::select(1),
5376 ],
5377 vec![
5378 RowSelector::select(1),
5379 RowSelector::skip(1023),
5380 RowSelector::select(1),
5381 RowSelector::skip(1023),
5382 ],
5383 ];
5384
5385 for batch_size in [100, 1024, 2048] {
5386 for selection in &cases {
5387 let selection = RowSelection::from(selection.clone());
5388 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5389 .unwrap()
5390 .with_row_selection(selection.clone())
5391 .with_batch_size(batch_size)
5392 .build()
5393 .unwrap();
5394
5395 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5396 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5397 assert_eq!(actual.num_rows(), selection.row_count());
5398
5399 let mut batch_offset = 0;
5400 let mut actual_offset = 0;
5401 for selector in selection.iter() {
5402 if selector.skip {
5403 batch_offset += selector.row_count;
5404 continue;
5405 }
5406
5407 assert_eq!(
5408 batch.slice(batch_offset, selector.row_count),
5409 actual.slice(actual_offset, selector.row_count)
5410 );
5411
5412 batch_offset += selector.row_count;
5413 actual_offset += selector.row_count;
5414 }
5415 }
5416 }
5417 }
5418
5419 #[test]
5420 fn test_read_old_nested_list() {
5421 use arrow::datatypes::DataType;
5422 use arrow::datatypes::ToByteSlice;
5423
5424 let testdata = arrow::util::test_util::parquet_test_data();
5425 let path = format!("{testdata}/old_list_structure.parquet");
5434 let test_file = File::open(path).unwrap();
5435
5436 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5438
5439 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5441
5442 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5444 "array",
5445 DataType::Int32,
5446 false,
5447 ))))
5448 .len(2)
5449 .add_buffer(a_value_offsets)
5450 .add_child_data(a_values.into_data())
5451 .build()
5452 .unwrap();
5453 let a = ListArray::from(a_list_data);
5454
5455 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5456 let mut reader = builder.build().unwrap();
5457 let out = reader.next().unwrap().unwrap();
5458 assert_eq!(out.num_rows(), 1);
5459 assert_eq!(out.num_columns(), 1);
5460 let c0 = out.column(0);
5462 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5463 let r0 = c0arr.value(0);
5465 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5466 assert_eq!(r0arr, &a);
5467 }
5468
5469 #[test]
5470 fn test_map_no_value() {
5471 let testdata = arrow::util::test_util::parquet_test_data();
5491 let path = format!("{testdata}/map_no_value.parquet");
5492 let file = File::open(path).unwrap();
5493
5494 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5495 .unwrap()
5496 .build()
5497 .unwrap();
5498 let out = reader.next().unwrap().unwrap();
5499 assert_eq!(out.num_rows(), 3);
5500 assert_eq!(out.num_columns(), 3);
5501 let c0 = out.column(1).as_list::<i32>();
5503 let c1 = out.column(2).as_list::<i32>();
5504 assert_eq!(c0.len(), c1.len());
5505 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5506 }
5507
5508 #[test]
5509 fn test_read_unknown_logical_type() {
5510 let testdata = arrow::util::test_util::parquet_test_data();
5511 let path = format!("{testdata}/unknown-logical-type.parquet");
5512 let test_file = File::open(path).unwrap();
5513
5514 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5515 .expect("Error creating reader builder");
5516
5517 let schema = builder.metadata().file_metadata().schema_descr();
5518 assert_eq!(
5519 schema.column(0).logical_type_ref(),
5520 Some(&LogicalType::String)
5521 );
5522 assert_eq!(
5523 schema.column(1).logical_type_ref(),
5524 Some(&LogicalType::_Unknown { field_id: 2555 })
5525 );
5526 assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5527
5528 let mut reader = builder.build().unwrap();
5529 let out = reader.next().unwrap().unwrap();
5530 assert_eq!(out.num_rows(), 3);
5531 assert_eq!(out.num_columns(), 2);
5532 }
5533
5534 #[test]
5535 fn test_read_row_numbers() {
5536 let file = write_parquet_from_iter(vec![(
5537 "value",
5538 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5539 )]);
5540 let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5541
5542 let row_number_field = Arc::new(
5543 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5544 );
5545
5546 let options = ArrowReaderOptions::new()
5547 .with_schema(Arc::new(Schema::new(supplied_fields)))
5548 .with_virtual_columns(vec![row_number_field.clone()])
5549 .unwrap();
5550 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5551 file.try_clone().unwrap(),
5552 options,
5553 )
5554 .expect("reader builder with schema")
5555 .build()
5556 .expect("reader with schema");
5557
5558 let batch = arrow_reader.next().unwrap().unwrap();
5559 let schema = Arc::new(Schema::new(vec![
5560 Field::new("value", ArrowDataType::Int64, false),
5561 (*row_number_field).clone(),
5562 ]));
5563
5564 assert_eq!(batch.schema(), schema);
5565 assert_eq!(batch.num_columns(), 2);
5566 assert_eq!(batch.num_rows(), 3);
5567 assert_eq!(
5568 batch
5569 .column(0)
5570 .as_primitive::<types::Int64Type>()
5571 .iter()
5572 .collect::<Vec<_>>(),
5573 vec![Some(1), Some(2), Some(3)]
5574 );
5575 assert_eq!(
5576 batch
5577 .column(1)
5578 .as_primitive::<types::Int64Type>()
5579 .iter()
5580 .collect::<Vec<_>>(),
5581 vec![Some(0), Some(1), Some(2)]
5582 );
5583 }
5584
5585 #[test]
5586 fn test_read_only_row_numbers() {
5587 let file = write_parquet_from_iter(vec![(
5588 "value",
5589 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5590 )]);
5591 let row_number_field = Arc::new(
5592 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5593 );
5594 let options = ArrowReaderOptions::new()
5595 .with_virtual_columns(vec![row_number_field.clone()])
5596 .unwrap();
5597 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5598 let num_columns = metadata
5599 .metadata
5600 .file_metadata()
5601 .schema_descr()
5602 .num_columns();
5603
5604 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5605 .with_projection(ProjectionMask::none(num_columns))
5606 .build()
5607 .expect("reader with schema");
5608
5609 let batch = arrow_reader.next().unwrap().unwrap();
5610 let schema = Arc::new(Schema::new(vec![row_number_field]));
5611
5612 assert_eq!(batch.schema(), schema);
5613 assert_eq!(batch.num_columns(), 1);
5614 assert_eq!(batch.num_rows(), 3);
5615 assert_eq!(
5616 batch
5617 .column(0)
5618 .as_primitive::<types::Int64Type>()
5619 .iter()
5620 .collect::<Vec<_>>(),
5621 vec![Some(0), Some(1), Some(2)]
5622 );
5623 }
5624
5625 #[test]
5626 fn test_read_row_numbers_row_group_order() -> Result<()> {
5627 let array = Int64Array::from_iter_values(5000..5100);
5629 let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5630 let mut buffer = Vec::new();
5631 let options = WriterProperties::builder()
5632 .set_max_row_group_size(50)
5633 .build();
5634 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5635 for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5637 writer.write(&batch_chunk)?;
5638 }
5639 writer.close()?;
5640
5641 let row_number_field = Arc::new(
5642 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5643 );
5644
5645 let buffer = Bytes::from(buffer);
5646
5647 let options =
5648 ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5649
5650 let arrow_reader =
5652 ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5653 .build()?;
5654
5655 assert_eq!(
5656 ValuesAndRowNumbers {
5657 values: (5000..5100).collect(),
5658 row_numbers: (0..100).collect()
5659 },
5660 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5661 );
5662
5663 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5665 .with_row_groups(vec![1, 0])
5666 .build()?;
5667
5668 assert_eq!(
5669 ValuesAndRowNumbers {
5670 values: (5050..5100).chain(5000..5050).collect(),
5671 row_numbers: (50..100).chain(0..50).collect(),
5672 },
5673 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5674 );
5675
5676 Ok(())
5677 }
5678
5679 #[derive(Debug, PartialEq)]
5680 struct ValuesAndRowNumbers {
5681 values: Vec<i64>,
5682 row_numbers: Vec<i64>,
5683 }
5684 impl ValuesAndRowNumbers {
5685 fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5686 let mut values = vec![];
5687 let mut row_numbers = vec![];
5688 for batch in reader {
5689 let batch = batch.expect("Could not read batch");
5690 values.extend(
5691 batch
5692 .column_by_name("col")
5693 .expect("Could not get col column")
5694 .as_primitive::<arrow::datatypes::Int64Type>()
5695 .iter()
5696 .map(|v| v.expect("Could not get value")),
5697 );
5698
5699 row_numbers.extend(
5700 batch
5701 .column_by_name("row_number")
5702 .expect("Could not get row_number column")
5703 .as_primitive::<arrow::datatypes::Int64Type>()
5704 .iter()
5705 .map(|v| v.expect("Could not get row number"))
5706 .collect::<Vec<_>>(),
5707 );
5708 }
5709 Self {
5710 values,
5711 row_numbers,
5712 }
5713 }
5714 }
5715
5716 #[test]
5717 fn test_with_virtual_columns_rejects_non_virtual_fields() {
5718 let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5720 assert_eq!(
5721 ArrowReaderOptions::new()
5722 .with_virtual_columns(vec![regular_field])
5723 .unwrap_err()
5724 .to_string(),
5725 "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5726 );
5727 }
5728
5729 #[test]
5730 fn test_row_numbers_with_multiple_row_groups() {
5731 test_row_numbers_with_multiple_row_groups_helper(
5732 false,
5733 |path, selection, _row_filter, batch_size| {
5734 let file = File::open(path).unwrap();
5735 let row_number_field = Arc::new(
5736 Field::new("row_number", ArrowDataType::Int64, false)
5737 .with_extension_type(RowNumber),
5738 );
5739 let options = ArrowReaderOptions::new()
5740 .with_virtual_columns(vec![row_number_field])
5741 .unwrap();
5742 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5743 .unwrap()
5744 .with_row_selection(selection)
5745 .with_batch_size(batch_size)
5746 .build()
5747 .expect("Could not create reader");
5748 reader
5749 .collect::<Result<Vec<_>, _>>()
5750 .expect("Could not read")
5751 },
5752 );
5753 }
5754
5755 #[test]
5756 fn test_row_numbers_with_multiple_row_groups_and_filter() {
5757 test_row_numbers_with_multiple_row_groups_helper(
5758 true,
5759 |path, selection, row_filter, batch_size| {
5760 let file = File::open(path).unwrap();
5761 let row_number_field = Arc::new(
5762 Field::new("row_number", ArrowDataType::Int64, false)
5763 .with_extension_type(RowNumber),
5764 );
5765 let options = ArrowReaderOptions::new()
5766 .with_virtual_columns(vec![row_number_field])
5767 .unwrap();
5768 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5769 .unwrap()
5770 .with_row_selection(selection)
5771 .with_batch_size(batch_size)
5772 .with_row_filter(row_filter.expect("No filter"))
5773 .build()
5774 .expect("Could not create reader");
5775 reader
5776 .collect::<Result<Vec<_>, _>>()
5777 .expect("Could not read")
5778 },
5779 );
5780 }
5781
5782 #[test]
5783 fn test_read_row_group_indices() {
5784 let array1 = Int64Array::from(vec![1, 2]);
5786 let array2 = Int64Array::from(vec![3, 4]);
5787 let array3 = Int64Array::from(vec![5, 6]);
5788
5789 let batch1 =
5790 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5791 let batch2 =
5792 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5793 let batch3 =
5794 RecordBatch::try_from_iter(vec![("value", Arc::new(array3) as ArrayRef)]).unwrap();
5795
5796 let mut buffer = Vec::new();
5797 let options = WriterProperties::builder()
5798 .set_max_row_group_size(2)
5799 .build();
5800 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5801 writer.write(&batch1).unwrap();
5802 writer.write(&batch2).unwrap();
5803 writer.write(&batch3).unwrap();
5804 writer.close().unwrap();
5805
5806 let file = Bytes::from(buffer);
5807 let row_group_index_field = Arc::new(
5808 Field::new("row_group_index", ArrowDataType::Int64, false)
5809 .with_extension_type(RowGroupIndex),
5810 );
5811
5812 let options = ArrowReaderOptions::new()
5813 .with_virtual_columns(vec![row_group_index_field.clone()])
5814 .unwrap();
5815 let mut arrow_reader =
5816 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options)
5817 .expect("reader builder with virtual columns")
5818 .build()
5819 .expect("reader with virtual columns");
5820
5821 let batch = arrow_reader.next().unwrap().unwrap();
5822
5823 assert_eq!(batch.num_columns(), 2);
5824 assert_eq!(batch.num_rows(), 6);
5825
5826 assert_eq!(
5827 batch
5828 .column(0)
5829 .as_primitive::<types::Int64Type>()
5830 .iter()
5831 .collect::<Vec<_>>(),
5832 vec![Some(1), Some(2), Some(3), Some(4), Some(5), Some(6)]
5833 );
5834
5835 assert_eq!(
5836 batch
5837 .column(1)
5838 .as_primitive::<types::Int64Type>()
5839 .iter()
5840 .collect::<Vec<_>>(),
5841 vec![Some(0), Some(0), Some(1), Some(1), Some(2), Some(2)]
5842 );
5843 }
5844
5845 #[test]
5846 fn test_read_only_row_group_indices() {
5847 let array1 = Int64Array::from(vec![1, 2, 3]);
5848 let array2 = Int64Array::from(vec![4, 5]);
5849
5850 let batch1 =
5851 RecordBatch::try_from_iter(vec![("value", Arc::new(array1) as ArrayRef)]).unwrap();
5852 let batch2 =
5853 RecordBatch::try_from_iter(vec![("value", Arc::new(array2) as ArrayRef)]).unwrap();
5854
5855 let mut buffer = Vec::new();
5856 let options = WriterProperties::builder()
5857 .set_max_row_group_size(3)
5858 .build();
5859 let mut writer = ArrowWriter::try_new(&mut buffer, batch1.schema(), Some(options)).unwrap();
5860 writer.write(&batch1).unwrap();
5861 writer.write(&batch2).unwrap();
5862 writer.close().unwrap();
5863
5864 let file = Bytes::from(buffer);
5865 let row_group_index_field = Arc::new(
5866 Field::new("row_group_index", ArrowDataType::Int64, false)
5867 .with_extension_type(RowGroupIndex),
5868 );
5869
5870 let options = ArrowReaderOptions::new()
5871 .with_virtual_columns(vec![row_group_index_field.clone()])
5872 .unwrap();
5873 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5874 let num_columns = metadata
5875 .metadata
5876 .file_metadata()
5877 .schema_descr()
5878 .num_columns();
5879
5880 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5881 .with_projection(ProjectionMask::none(num_columns))
5882 .build()
5883 .expect("reader with virtual columns only");
5884
5885 let batch = arrow_reader.next().unwrap().unwrap();
5886 let schema = Arc::new(Schema::new(vec![(*row_group_index_field).clone()]));
5887
5888 assert_eq!(batch.schema(), schema);
5889 assert_eq!(batch.num_columns(), 1);
5890 assert_eq!(batch.num_rows(), 5);
5891
5892 assert_eq!(
5893 batch
5894 .column(0)
5895 .as_primitive::<types::Int64Type>()
5896 .iter()
5897 .collect::<Vec<_>>(),
5898 vec![Some(0), Some(0), Some(0), Some(1), Some(1)]
5899 );
5900 }
5901
5902 #[test]
5903 fn test_read_row_group_indices_with_selection() -> Result<()> {
5904 let mut buffer = Vec::new();
5905 let options = WriterProperties::builder()
5906 .set_max_row_group_size(10)
5907 .build();
5908
5909 let schema = Arc::new(Schema::new(vec![Field::new(
5910 "value",
5911 ArrowDataType::Int64,
5912 false,
5913 )]));
5914
5915 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(options))?;
5916
5917 for i in 0..3 {
5919 let start = i * 10;
5920 let array = Int64Array::from_iter_values(start..start + 10);
5921 let batch = RecordBatch::try_from_iter(vec![("value", Arc::new(array) as ArrayRef)])?;
5922 writer.write(&batch)?;
5923 }
5924 writer.close()?;
5925
5926 let file = Bytes::from(buffer);
5927 let row_group_index_field = Arc::new(
5928 Field::new("rg_idx", ArrowDataType::Int64, false).with_extension_type(RowGroupIndex),
5929 );
5930
5931 let options =
5932 ArrowReaderOptions::new().with_virtual_columns(vec![row_group_index_field])?;
5933
5934 let arrow_reader =
5936 ParquetRecordBatchReaderBuilder::try_new_with_options(file.clone(), options.clone())?
5937 .with_row_groups(vec![2, 1, 0])
5938 .build()?;
5939
5940 let batches: Vec<_> = arrow_reader.collect::<Result<Vec<_>, _>>()?;
5941 let combined = concat_batches(&batches[0].schema(), &batches)?;
5942
5943 let values = combined.column(0).as_primitive::<types::Int64Type>();
5944 let first_val = values.value(0);
5945 let last_val = values.value(combined.num_rows() - 1);
5946 assert_eq!(first_val, 20);
5948 assert_eq!(last_val, 9);
5950
5951 let rg_indices = combined.column(1).as_primitive::<types::Int64Type>();
5952 assert_eq!(rg_indices.value(0), 2);
5953 assert_eq!(rg_indices.value(10), 1);
5954 assert_eq!(rg_indices.value(20), 0);
5955
5956 Ok(())
5957 }
5958
5959 pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5960 use_filter: bool,
5961 test_case: F,
5962 ) where
5963 F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5964 {
5965 let seed: u64 = random();
5966 println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5967 let mut rng = StdRng::seed_from_u64(seed);
5968
5969 use tempfile::TempDir;
5970 let tempdir = TempDir::new().expect("Could not create temp dir");
5971
5972 let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5973
5974 let path = tempdir.path().join("test.parquet");
5975 std::fs::write(&path, bytes).expect("Could not write file");
5976
5977 let mut case = vec![];
5978 let mut remaining = metadata.file_metadata().num_rows();
5979 while remaining > 0 {
5980 let row_count = rng.random_range(1..=remaining);
5981 remaining -= row_count;
5982 case.push(RowSelector {
5983 row_count: row_count as usize,
5984 skip: rng.random_bool(0.5),
5985 });
5986 }
5987
5988 let filter = use_filter.then(|| {
5989 let filter = (0..metadata.file_metadata().num_rows())
5990 .map(|_| rng.random_bool(0.99))
5991 .collect::<Vec<_>>();
5992 let mut filter_offset = 0;
5993 RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5994 ProjectionMask::all(),
5995 move |b| {
5996 let array = BooleanArray::from_iter(
5997 filter
5998 .iter()
5999 .skip(filter_offset)
6000 .take(b.num_rows())
6001 .map(|x| Some(*x)),
6002 );
6003 filter_offset += b.num_rows();
6004 Ok(array)
6005 },
6006 ))])
6007 });
6008
6009 let selection = RowSelection::from(case);
6010 let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
6011
6012 if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
6013 assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
6014 return;
6015 }
6016 let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
6017 .expect("Failed to concatenate");
6018 let values = actual
6020 .column(0)
6021 .as_primitive::<types::Int64Type>()
6022 .iter()
6023 .collect::<Vec<_>>();
6024 let row_numbers = actual
6025 .column(1)
6026 .as_primitive::<types::Int64Type>()
6027 .iter()
6028 .collect::<Vec<_>>();
6029 assert_eq!(
6030 row_numbers
6031 .into_iter()
6032 .map(|number| number.map(|number| number + 1))
6033 .collect::<Vec<_>>(),
6034 values
6035 );
6036 }
6037
6038 fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
6039 let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
6040 "value",
6041 ArrowDataType::Int64,
6042 false,
6043 )])));
6044
6045 let mut buf = Vec::with_capacity(1024);
6046 let mut writer =
6047 ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
6048
6049 let mut values = 1..=rng.random_range(1..4096);
6050 while !values.is_empty() {
6051 let batch_values = values
6052 .by_ref()
6053 .take(rng.random_range(1..4096))
6054 .collect::<Vec<_>>();
6055 let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
6056 let batch =
6057 RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
6058 writer.write(&batch).expect("Could not write batch");
6059 writer.flush().expect("Could not flush");
6060 }
6061 let metadata = writer.close().expect("Could not close writer");
6062
6063 (Bytes::from(buf), metadata)
6064 }
6065}