1use arrow_array::cast::AsArray;
21use arrow_array::{Array, RecordBatch, RecordBatchReader};
22use arrow_schema::{ArrowError, DataType as ArrowType, FieldRef, Schema, SchemaRef};
23use arrow_select::filter::filter_record_batch;
24pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
25pub use selection::{RowSelection, RowSelectionCursor, RowSelectionPolicy, RowSelector};
26use std::fmt::{Debug, Formatter};
27use std::sync::Arc;
28
29pub use crate::arrow::array_reader::RowGroups;
30use crate::arrow::array_reader::{ArrayReader, ArrayReaderBuilder};
31use crate::arrow::schema::{
32 ParquetField, parquet_to_arrow_schema_and_fields, virtual_type::is_virtual_column,
33};
34use crate::arrow::{FieldLevels, ProjectionMask, parquet_to_arrow_field_levels_with_virtual};
35use crate::basic::{BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash};
36use crate::bloom_filter::{
37 SBBF_HEADER_SIZE_ESTIMATE, Sbbf, chunk_read_bloom_filter_header_and_offset,
38};
39use crate::column::page::{PageIterator, PageReader};
40#[cfg(feature = "encryption")]
41use crate::encryption::decrypt::FileDecryptionProperties;
42use crate::errors::{ParquetError, Result};
43use crate::file::metadata::{
44 PageIndexPolicy, ParquetMetaData, ParquetMetaDataOptions, ParquetMetaDataReader,
45 RowGroupMetaData,
46};
47use crate::file::reader::{ChunkReader, SerializedPageReader};
48use crate::schema::types::SchemaDescriptor;
49
50use crate::arrow::arrow_reader::metrics::ArrowReaderMetrics;
51pub use read_plan::{ReadPlan, ReadPlanBuilder};
53
54mod filter;
55pub mod metrics;
56mod read_plan;
57pub(crate) mod selection;
58pub mod statistics;
59
60pub struct ArrowReaderBuilder<T> {
108 pub(crate) input: T,
116
117 pub(crate) metadata: Arc<ParquetMetaData>,
118
119 pub(crate) schema: SchemaRef,
120
121 pub(crate) fields: Option<Arc<ParquetField>>,
122
123 pub(crate) batch_size: usize,
124
125 pub(crate) row_groups: Option<Vec<usize>>,
126
127 pub(crate) projection: ProjectionMask,
128
129 pub(crate) filter: Option<RowFilter>,
130
131 pub(crate) selection: Option<RowSelection>,
132
133 pub(crate) row_selection_policy: RowSelectionPolicy,
134
135 pub(crate) limit: Option<usize>,
136
137 pub(crate) offset: Option<usize>,
138
139 pub(crate) metrics: ArrowReaderMetrics,
140
141 pub(crate) max_predicate_cache_size: usize,
142}
143
144impl<T: Debug> Debug for ArrowReaderBuilder<T> {
145 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
146 f.debug_struct("ArrowReaderBuilder<T>")
147 .field("input", &self.input)
148 .field("metadata", &self.metadata)
149 .field("schema", &self.schema)
150 .field("fields", &self.fields)
151 .field("batch_size", &self.batch_size)
152 .field("row_groups", &self.row_groups)
153 .field("projection", &self.projection)
154 .field("filter", &self.filter)
155 .field("selection", &self.selection)
156 .field("row_selection_policy", &self.row_selection_policy)
157 .field("limit", &self.limit)
158 .field("offset", &self.offset)
159 .field("metrics", &self.metrics)
160 .finish()
161 }
162}
163
164impl<T> ArrowReaderBuilder<T> {
165 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
166 Self {
167 input,
168 metadata: metadata.metadata,
169 schema: metadata.schema,
170 fields: metadata.fields,
171 batch_size: 1024,
172 row_groups: None,
173 projection: ProjectionMask::all(),
174 filter: None,
175 selection: None,
176 row_selection_policy: RowSelectionPolicy::default(),
177 limit: None,
178 offset: None,
179 metrics: ArrowReaderMetrics::Disabled,
180 max_predicate_cache_size: 100 * 1024 * 1024, }
182 }
183
184 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
186 &self.metadata
187 }
188
189 pub fn parquet_schema(&self) -> &SchemaDescriptor {
191 self.metadata.file_metadata().schema_descr()
192 }
193
194 pub fn schema(&self) -> &SchemaRef {
196 &self.schema
197 }
198
199 pub fn with_batch_size(self, batch_size: usize) -> Self {
202 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
204 Self { batch_size, ..self }
205 }
206
207 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
211 Self {
212 row_groups: Some(row_groups),
213 ..self
214 }
215 }
216
217 pub fn with_projection(self, mask: ProjectionMask) -> Self {
219 Self {
220 projection: mask,
221 ..self
222 }
223 }
224
225 pub fn with_row_selection_policy(self, policy: RowSelectionPolicy) -> Self {
229 Self {
230 row_selection_policy: policy,
231 ..self
232 }
233 }
234
235 pub fn with_row_selection(self, selection: RowSelection) -> Self {
295 Self {
296 selection: Some(selection),
297 ..self
298 }
299 }
300
301 pub fn with_row_filter(self, filter: RowFilter) -> Self {
308 Self {
309 filter: Some(filter),
310 ..self
311 }
312 }
313
314 pub fn with_limit(self, limit: usize) -> Self {
322 Self {
323 limit: Some(limit),
324 ..self
325 }
326 }
327
328 pub fn with_offset(self, offset: usize) -> Self {
336 Self {
337 offset: Some(offset),
338 ..self
339 }
340 }
341
342 pub fn with_metrics(self, metrics: ArrowReaderMetrics) -> Self {
377 Self { metrics, ..self }
378 }
379
380 pub fn with_max_predicate_cache_size(self, max_predicate_cache_size: usize) -> Self {
395 Self {
396 max_predicate_cache_size,
397 ..self
398 }
399 }
400}
401
402#[derive(Debug, Clone, Default)]
417pub struct ArrowReaderOptions {
418 skip_arrow_metadata: bool,
420 supplied_schema: Option<SchemaRef>,
425 pub(crate) page_index_policy: PageIndexPolicy,
427 metadata_options: ParquetMetaDataOptions,
429 #[cfg(feature = "encryption")]
431 pub(crate) file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
432
433 virtual_columns: Vec<FieldRef>,
434}
435
436impl ArrowReaderOptions {
437 pub fn new() -> Self {
439 Self::default()
440 }
441
442 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
449 Self {
450 skip_arrow_metadata,
451 ..self
452 }
453 }
454
455 pub fn with_schema(self, schema: SchemaRef) -> Self {
512 Self {
513 supplied_schema: Some(schema),
514 skip_arrow_metadata: true,
515 ..self
516 }
517 }
518
519 pub fn with_page_index(self, page_index: bool) -> Self {
532 let page_index_policy = PageIndexPolicy::from(page_index);
533
534 Self {
535 page_index_policy,
536 ..self
537 }
538 }
539
540 pub fn with_page_index_policy(self, policy: PageIndexPolicy) -> Self {
544 Self {
545 page_index_policy: policy,
546 ..self
547 }
548 }
549
550 pub fn with_parquet_schema(mut self, schema: Arc<SchemaDescriptor>) -> Self {
556 self.metadata_options.set_schema(schema);
557 self
558 }
559
560 #[cfg(feature = "encryption")]
564 pub fn with_file_decryption_properties(
565 self,
566 file_decryption_properties: Arc<FileDecryptionProperties>,
567 ) -> Self {
568 Self {
569 file_decryption_properties: Some(file_decryption_properties),
570 ..self
571 }
572 }
573
574 pub fn with_virtual_columns(self, virtual_columns: Vec<FieldRef>) -> Result<Self> {
626 for field in &virtual_columns {
628 if !is_virtual_column(field) {
629 return Err(ParquetError::General(format!(
630 "Field '{}' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'",
631 field.name()
632 )));
633 }
634 }
635 Ok(Self {
636 virtual_columns,
637 ..self
638 })
639 }
640
641 pub fn page_index(&self) -> bool {
645 self.page_index_policy != PageIndexPolicy::Skip
646 }
647
648 pub fn metadata_options(&self) -> &ParquetMetaDataOptions {
650 &self.metadata_options
651 }
652
653 #[cfg(feature = "encryption")]
658 pub fn file_decryption_properties(&self) -> Option<&Arc<FileDecryptionProperties>> {
659 self.file_decryption_properties.as_ref()
660 }
661}
662
663#[derive(Debug, Clone)]
678pub struct ArrowReaderMetadata {
679 pub(crate) metadata: Arc<ParquetMetaData>,
681 pub(crate) schema: SchemaRef,
683 pub(crate) fields: Option<Arc<ParquetField>>,
685}
686
687impl ArrowReaderMetadata {
688 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
700 let metadata = ParquetMetaDataReader::new()
701 .with_page_index_policy(options.page_index_policy)
702 .with_metadata_options(Some(options.metadata_options.clone()));
703 #[cfg(feature = "encryption")]
704 let metadata = metadata.with_decryption_properties(
705 options.file_decryption_properties.as_ref().map(Arc::clone),
706 );
707 let metadata = metadata.parse_and_finish(reader)?;
708 Self::try_new(Arc::new(metadata), options)
709 }
710
711 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
719 match options.supplied_schema {
720 Some(supplied_schema) => Self::with_supplied_schema(
721 metadata,
722 supplied_schema.clone(),
723 &options.virtual_columns,
724 ),
725 None => {
726 let kv_metadata = match options.skip_arrow_metadata {
727 true => None,
728 false => metadata.file_metadata().key_value_metadata(),
729 };
730
731 let (schema, fields) = parquet_to_arrow_schema_and_fields(
732 metadata.file_metadata().schema_descr(),
733 ProjectionMask::all(),
734 kv_metadata,
735 &options.virtual_columns,
736 )?;
737
738 Ok(Self {
739 metadata,
740 schema: Arc::new(schema),
741 fields: fields.map(Arc::new),
742 })
743 }
744 }
745 }
746
747 fn with_supplied_schema(
748 metadata: Arc<ParquetMetaData>,
749 supplied_schema: SchemaRef,
750 virtual_columns: &[FieldRef],
751 ) -> Result<Self> {
752 let parquet_schema = metadata.file_metadata().schema_descr();
753 let field_levels = parquet_to_arrow_field_levels_with_virtual(
754 parquet_schema,
755 ProjectionMask::all(),
756 Some(supplied_schema.fields()),
757 virtual_columns,
758 )?;
759 let fields = field_levels.fields;
760 let inferred_len = fields.len();
761 let supplied_len = supplied_schema.fields().len() + virtual_columns.len();
762 if inferred_len != supplied_len {
766 return Err(arrow_err!(format!(
767 "Incompatible supplied Arrow schema: expected {} columns received {}",
768 inferred_len, supplied_len
769 )));
770 }
771
772 let mut errors = Vec::new();
773
774 let field_iter = supplied_schema.fields().iter().zip(fields.iter());
775
776 for (field1, field2) in field_iter {
777 if field1.data_type() != field2.data_type() {
778 errors.push(format!(
779 "data type mismatch for field {}: requested {} but found {}",
780 field1.name(),
781 field1.data_type(),
782 field2.data_type()
783 ));
784 }
785 if field1.is_nullable() != field2.is_nullable() {
786 errors.push(format!(
787 "nullability mismatch for field {}: expected {:?} but found {:?}",
788 field1.name(),
789 field1.is_nullable(),
790 field2.is_nullable()
791 ));
792 }
793 if field1.metadata() != field2.metadata() {
794 errors.push(format!(
795 "metadata mismatch for field {}: expected {:?} but found {:?}",
796 field1.name(),
797 field1.metadata(),
798 field2.metadata()
799 ));
800 }
801 }
802
803 if !errors.is_empty() {
804 let message = errors.join(", ");
805 return Err(ParquetError::ArrowError(format!(
806 "Incompatible supplied Arrow schema: {message}",
807 )));
808 }
809
810 Ok(Self {
811 metadata,
812 schema: supplied_schema,
813 fields: field_levels.levels.map(Arc::new),
814 })
815 }
816
817 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
819 &self.metadata
820 }
821
822 pub fn parquet_schema(&self) -> &SchemaDescriptor {
824 self.metadata.file_metadata().schema_descr()
825 }
826
827 pub fn schema(&self) -> &SchemaRef {
829 &self.schema
830 }
831}
832
833#[doc(hidden)]
834pub struct SyncReader<T: ChunkReader>(T);
836
837impl<T: Debug + ChunkReader> Debug for SyncReader<T> {
838 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
839 f.debug_tuple("SyncReader").field(&self.0).finish()
840 }
841}
842
843pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
850
851impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
852 pub fn try_new(reader: T) -> Result<Self> {
881 Self::try_new_with_options(reader, Default::default())
882 }
883
884 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
889 let metadata = ArrowReaderMetadata::load(&reader, options)?;
890 Ok(Self::new_with_metadata(reader, metadata))
891 }
892
893 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
932 Self::new_builder(SyncReader(input), metadata)
933 }
934
935 pub fn get_row_group_column_bloom_filter(
941 &self,
942 row_group_idx: usize,
943 column_idx: usize,
944 ) -> Result<Option<Sbbf>> {
945 let metadata = self.metadata.row_group(row_group_idx);
946 let column_metadata = metadata.column(column_idx);
947
948 let offset: u64 = if let Some(offset) = column_metadata.bloom_filter_offset() {
949 offset
950 .try_into()
951 .map_err(|_| ParquetError::General("Bloom filter offset is invalid".to_string()))?
952 } else {
953 return Ok(None);
954 };
955
956 let buffer = match column_metadata.bloom_filter_length() {
957 Some(length) => self.input.0.get_bytes(offset, length as usize),
958 None => self.input.0.get_bytes(offset, SBBF_HEADER_SIZE_ESTIMATE),
959 }?;
960
961 let (header, bitset_offset) =
962 chunk_read_bloom_filter_header_and_offset(offset, buffer.clone())?;
963
964 match header.algorithm {
965 BloomFilterAlgorithm::BLOCK => {
966 }
968 }
969 match header.compression {
970 BloomFilterCompression::UNCOMPRESSED => {
971 }
973 }
974 match header.hash {
975 BloomFilterHash::XXHASH => {
976 }
978 }
979
980 let bitset = match column_metadata.bloom_filter_length() {
981 Some(_) => buffer.slice(
982 (TryInto::<usize>::try_into(bitset_offset).unwrap()
983 - TryInto::<usize>::try_into(offset).unwrap())..,
984 ),
985 None => {
986 let bitset_length: usize = header.num_bytes.try_into().map_err(|_| {
987 ParquetError::General("Bloom filter length is invalid".to_string())
988 })?;
989 self.input.0.get_bytes(bitset_offset, bitset_length)?
990 }
991 };
992 Ok(Some(Sbbf::new(&bitset)))
993 }
994
995 pub fn build(self) -> Result<ParquetRecordBatchReader> {
999 let Self {
1000 input,
1001 metadata,
1002 schema: _,
1003 fields,
1004 batch_size,
1005 row_groups,
1006 projection,
1007 mut filter,
1008 selection,
1009 row_selection_policy,
1010 limit,
1011 offset,
1012 metrics,
1013 max_predicate_cache_size: _,
1015 } = self;
1016
1017 let batch_size = batch_size.min(metadata.file_metadata().num_rows() as usize);
1019
1020 let row_groups = row_groups.unwrap_or_else(|| (0..metadata.num_row_groups()).collect());
1021
1022 let reader = ReaderRowGroups {
1023 reader: Arc::new(input.0),
1024 metadata,
1025 row_groups,
1026 };
1027
1028 let mut plan_builder = ReadPlanBuilder::new(batch_size)
1029 .with_selection(selection)
1030 .with_row_selection_policy(row_selection_policy);
1031
1032 if let Some(filter) = filter.as_mut() {
1034 for predicate in filter.predicates.iter_mut() {
1035 if !plan_builder.selects_any() {
1037 break;
1038 }
1039
1040 let mut cache_projection = predicate.projection().clone();
1041 cache_projection.intersect(&projection);
1042
1043 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1044 .with_parquet_metadata(&reader.metadata)
1045 .build_array_reader(fields.as_deref(), predicate.projection())?;
1046
1047 plan_builder = plan_builder.with_predicate(array_reader, predicate.as_mut())?;
1048 }
1049 }
1050
1051 let array_reader = ArrayReaderBuilder::new(&reader, &metrics)
1052 .with_parquet_metadata(&reader.metadata)
1053 .build_array_reader(fields.as_deref(), &projection)?;
1054
1055 let read_plan = plan_builder
1056 .limited(reader.num_rows())
1057 .with_offset(offset)
1058 .with_limit(limit)
1059 .build_limited()
1060 .build();
1061
1062 Ok(ParquetRecordBatchReader::new(array_reader, read_plan))
1063 }
1064}
1065
1066struct ReaderRowGroups<T: ChunkReader> {
1067 reader: Arc<T>,
1068
1069 metadata: Arc<ParquetMetaData>,
1070 row_groups: Vec<usize>,
1072}
1073
1074impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
1075 fn num_rows(&self) -> usize {
1076 let meta = self.metadata.row_groups();
1077 self.row_groups
1078 .iter()
1079 .map(|x| meta[*x].num_rows() as usize)
1080 .sum()
1081 }
1082
1083 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
1084 Ok(Box::new(ReaderPageIterator {
1085 column_idx: i,
1086 reader: self.reader.clone(),
1087 metadata: self.metadata.clone(),
1088 row_groups: self.row_groups.clone().into_iter(),
1089 }))
1090 }
1091
1092 fn row_groups(&self) -> Box<dyn Iterator<Item = &RowGroupMetaData> + '_> {
1093 Box::new(
1094 self.row_groups
1095 .iter()
1096 .map(move |i| self.metadata.row_group(*i)),
1097 )
1098 }
1099
1100 fn metadata(&self) -> &ParquetMetaData {
1101 self.metadata.as_ref()
1102 }
1103}
1104
1105struct ReaderPageIterator<T: ChunkReader> {
1106 reader: Arc<T>,
1107 column_idx: usize,
1108 row_groups: std::vec::IntoIter<usize>,
1109 metadata: Arc<ParquetMetaData>,
1110}
1111
1112impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
1113 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
1115 let rg = self.metadata.row_group(rg_idx);
1116 let column_chunk_metadata = rg.column(self.column_idx);
1117 let offset_index = self.metadata.offset_index();
1118 let page_locations = offset_index
1121 .filter(|i| !i[rg_idx].is_empty())
1122 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
1123 let total_rows = rg.num_rows() as usize;
1124 let reader = self.reader.clone();
1125
1126 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
1127 .add_crypto_context(
1128 rg_idx,
1129 self.column_idx,
1130 self.metadata.as_ref(),
1131 column_chunk_metadata,
1132 )
1133 }
1134}
1135
1136impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
1137 type Item = Result<Box<dyn PageReader>>;
1138
1139 fn next(&mut self) -> Option<Self::Item> {
1140 let rg_idx = self.row_groups.next()?;
1141 let page_reader = self
1142 .next_page_reader(rg_idx)
1143 .map(|page_reader| Box::new(page_reader) as _);
1144 Some(page_reader)
1145 }
1146}
1147
1148impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
1149
1150pub struct ParquetRecordBatchReader {
1161 array_reader: Box<dyn ArrayReader>,
1162 schema: SchemaRef,
1163 read_plan: ReadPlan,
1164}
1165
1166impl Debug for ParquetRecordBatchReader {
1167 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1168 f.debug_struct("ParquetRecordBatchReader")
1169 .field("array_reader", &"...")
1170 .field("schema", &self.schema)
1171 .field("read_plan", &self.read_plan)
1172 .finish()
1173 }
1174}
1175
1176impl Iterator for ParquetRecordBatchReader {
1177 type Item = Result<RecordBatch, ArrowError>;
1178
1179 fn next(&mut self) -> Option<Self::Item> {
1180 self.next_inner()
1181 .map_err(|arrow_err| arrow_err.into())
1182 .transpose()
1183 }
1184}
1185
1186impl ParquetRecordBatchReader {
1187 fn next_inner(&mut self) -> Result<Option<RecordBatch>> {
1193 let mut read_records = 0;
1194 let batch_size = self.batch_size();
1195 if batch_size == 0 {
1196 return Ok(None);
1197 }
1198 match self.read_plan.row_selection_cursor_mut() {
1199 RowSelectionCursor::Mask(mask_cursor) => {
1200 while !mask_cursor.is_empty() {
1203 let Some(mask_chunk) = mask_cursor.next_mask_chunk(batch_size) else {
1204 return Ok(None);
1205 };
1206
1207 if mask_chunk.initial_skip > 0 {
1208 let skipped = self.array_reader.skip_records(mask_chunk.initial_skip)?;
1209 if skipped != mask_chunk.initial_skip {
1210 return Err(general_err!(
1211 "failed to skip rows, expected {}, got {}",
1212 mask_chunk.initial_skip,
1213 skipped
1214 ));
1215 }
1216 }
1217
1218 if mask_chunk.chunk_rows == 0 {
1219 if mask_cursor.is_empty() && mask_chunk.selected_rows == 0 {
1220 return Ok(None);
1221 }
1222 continue;
1223 }
1224
1225 let mask = mask_cursor.mask_values_for(&mask_chunk)?;
1226
1227 let read = self.array_reader.read_records(mask_chunk.chunk_rows)?;
1228 if read == 0 {
1229 return Err(general_err!(
1230 "reached end of column while expecting {} rows",
1231 mask_chunk.chunk_rows
1232 ));
1233 }
1234 if read != mask_chunk.chunk_rows {
1235 return Err(general_err!(
1236 "insufficient rows read from array reader - expected {}, got {}",
1237 mask_chunk.chunk_rows,
1238 read
1239 ));
1240 }
1241
1242 let array = self.array_reader.consume_batch()?;
1243 let struct_array = array.as_struct_opt().ok_or_else(|| {
1246 ArrowError::ParquetError(
1247 "Struct array reader should return struct array".to_string(),
1248 )
1249 })?;
1250
1251 let filtered_batch =
1252 filter_record_batch(&RecordBatch::from(struct_array), &mask)?;
1253
1254 if filtered_batch.num_rows() != mask_chunk.selected_rows {
1255 return Err(general_err!(
1256 "filtered rows mismatch selection - expected {}, got {}",
1257 mask_chunk.selected_rows,
1258 filtered_batch.num_rows()
1259 ));
1260 }
1261
1262 if filtered_batch.num_rows() == 0 {
1263 continue;
1264 }
1265
1266 return Ok(Some(filtered_batch));
1267 }
1268 }
1269 RowSelectionCursor::Selectors(selectors_cursor) => {
1270 while read_records < batch_size && !selectors_cursor.is_empty() {
1271 let front = selectors_cursor.next_selector();
1272 if front.skip {
1273 let skipped = self.array_reader.skip_records(front.row_count)?;
1274
1275 if skipped != front.row_count {
1276 return Err(general_err!(
1277 "failed to skip rows, expected {}, got {}",
1278 front.row_count,
1279 skipped
1280 ));
1281 }
1282 continue;
1283 }
1284
1285 if front.row_count == 0 {
1288 continue;
1289 }
1290
1291 let need_read = batch_size - read_records;
1293 let to_read = match front.row_count.checked_sub(need_read) {
1294 Some(remaining) if remaining != 0 => {
1295 selectors_cursor.return_selector(RowSelector::select(remaining));
1298 need_read
1299 }
1300 _ => front.row_count,
1301 };
1302 match self.array_reader.read_records(to_read)? {
1303 0 => break,
1304 rec => read_records += rec,
1305 };
1306 }
1307 }
1308 RowSelectionCursor::All => {
1309 self.array_reader.read_records(batch_size)?;
1310 }
1311 };
1312
1313 let array = self.array_reader.consume_batch()?;
1314 let struct_array = array.as_struct_opt().ok_or_else(|| {
1315 ArrowError::ParquetError("Struct array reader should return struct array".to_string())
1316 })?;
1317
1318 Ok(if struct_array.len() > 0 {
1319 Some(RecordBatch::from(struct_array))
1320 } else {
1321 None
1322 })
1323 }
1324}
1325
1326impl RecordBatchReader for ParquetRecordBatchReader {
1327 fn schema(&self) -> SchemaRef {
1332 self.schema.clone()
1333 }
1334}
1335
1336impl ParquetRecordBatchReader {
1337 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
1341 ParquetRecordBatchReaderBuilder::try_new(reader)?
1342 .with_batch_size(batch_size)
1343 .build()
1344 }
1345
1346 pub fn try_new_with_row_groups(
1351 levels: &FieldLevels,
1352 row_groups: &dyn RowGroups,
1353 batch_size: usize,
1354 selection: Option<RowSelection>,
1355 ) -> Result<Self> {
1356 let metrics = ArrowReaderMetrics::disabled();
1358 let array_reader = ArrayReaderBuilder::new(row_groups, &metrics)
1359 .with_parquet_metadata(row_groups.metadata())
1360 .build_array_reader(levels.levels.as_ref(), &ProjectionMask::all())?;
1361
1362 let read_plan = ReadPlanBuilder::new(batch_size)
1363 .with_selection(selection)
1364 .build();
1365
1366 Ok(Self {
1367 array_reader,
1368 schema: Arc::new(Schema::new(levels.fields.clone())),
1369 read_plan,
1370 })
1371 }
1372
1373 pub(crate) fn new(array_reader: Box<dyn ArrayReader>, read_plan: ReadPlan) -> Self {
1377 let schema = match array_reader.get_data_type() {
1378 ArrowType::Struct(fields) => Schema::new(fields.clone()),
1379 _ => unreachable!("Struct array reader's data type is not struct!"),
1380 };
1381
1382 Self {
1383 array_reader,
1384 schema: Arc::new(schema),
1385 read_plan,
1386 }
1387 }
1388
1389 #[inline(always)]
1390 pub(crate) fn batch_size(&self) -> usize {
1391 self.read_plan.batch_size()
1392 }
1393}
1394
1395#[cfg(test)]
1396pub(crate) mod tests {
1397 use std::cmp::min;
1398 use std::collections::{HashMap, VecDeque};
1399 use std::fmt::Formatter;
1400 use std::fs::File;
1401 use std::io::Seek;
1402 use std::path::PathBuf;
1403 use std::sync::Arc;
1404
1405 use rand::rngs::StdRng;
1406 use rand::{Rng, RngCore, SeedableRng, random, rng};
1407 use tempfile::tempfile;
1408
1409 use crate::arrow::arrow_reader::{
1410 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions,
1411 ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, RowFilter, RowSelection,
1412 RowSelectionPolicy, RowSelector,
1413 };
1414 use crate::arrow::schema::{add_encoded_arrow_schema_to_metadata, virtual_type::RowNumber};
1415 use crate::arrow::{ArrowWriter, ProjectionMask};
1416 use crate::basic::{ConvertedType, Encoding, LogicalType, Repetition, Type as PhysicalType};
1417 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
1418 use crate::data_type::{
1419 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
1420 FloatType, Int32Type, Int64Type, Int96, Int96Type,
1421 };
1422 use crate::errors::Result;
1423 use crate::file::metadata::ParquetMetaData;
1424 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
1425 use crate::file::writer::SerializedFileWriter;
1426 use crate::schema::parser::parse_message_type;
1427 use crate::schema::types::{Type, TypePtr};
1428 use crate::util::test_common::rand_gen::RandGen;
1429 use arrow::compute::kernels::cmp::eq;
1430 use arrow::compute::or;
1431 use arrow_array::builder::*;
1432 use arrow_array::cast::AsArray;
1433 use arrow_array::types::{
1434 Date32Type, Date64Type, Decimal32Type, Decimal64Type, Decimal128Type, Decimal256Type,
1435 DecimalType, Float16Type, Float32Type, Float64Type, Time32MillisecondType,
1436 Time64MicrosecondType,
1437 };
1438 use arrow_array::*;
1439 use arrow_buffer::{ArrowNativeType, Buffer, IntervalDayTime, NullBuffer, i256};
1440 use arrow_data::{ArrayData, ArrayDataBuilder};
1441 use arrow_schema::{
1442 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
1443 };
1444 use arrow_select::concat::concat_batches;
1445 use bytes::Bytes;
1446 use half::f16;
1447 use num_traits::PrimInt;
1448
1449 #[test]
1450 fn test_arrow_reader_all_columns() {
1451 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1452
1453 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1454 let original_schema = Arc::clone(builder.schema());
1455 let reader = builder.build().unwrap();
1456
1457 assert_eq!(original_schema.fields(), reader.schema().fields());
1459 }
1460
1461 #[test]
1462 fn test_reuse_schema() {
1463 let file = get_test_file("parquet/alltypes-java.parquet");
1464
1465 let builder = ParquetRecordBatchReaderBuilder::try_new(file.try_clone().unwrap()).unwrap();
1466 let expected = builder.metadata;
1467 let schema = expected.file_metadata().schema_descr_ptr();
1468
1469 let arrow_options = ArrowReaderOptions::new().with_parquet_schema(schema.clone());
1470 let builder =
1471 ParquetRecordBatchReaderBuilder::try_new_with_options(file, arrow_options).unwrap();
1472
1473 assert_eq!(expected.as_ref(), builder.metadata.as_ref());
1475 }
1476
1477 #[test]
1478 fn test_arrow_reader_single_column() {
1479 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1480
1481 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1482 let original_schema = Arc::clone(builder.schema());
1483
1484 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1485 let reader = builder.with_projection(mask).build().unwrap();
1486
1487 assert_eq!(1, reader.schema().fields().len());
1489 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1490 }
1491
1492 #[test]
1493 fn test_arrow_reader_single_column_by_name() {
1494 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1495
1496 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1497 let original_schema = Arc::clone(builder.schema());
1498
1499 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1500 let reader = builder.with_projection(mask).build().unwrap();
1501
1502 assert_eq!(1, reader.schema().fields().len());
1504 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1505 }
1506
1507 #[test]
1508 fn test_null_column_reader_test() {
1509 let mut file = tempfile::tempfile().unwrap();
1510
1511 let schema = "
1512 message message {
1513 OPTIONAL INT32 int32;
1514 }
1515 ";
1516 let schema = Arc::new(parse_message_type(schema).unwrap());
1517
1518 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1519 generate_single_column_file_with_data::<Int32Type>(
1520 &[vec![], vec![]],
1521 Some(&def_levels),
1522 file.try_clone().unwrap(), schema,
1524 Some(Field::new("int32", ArrowDataType::Null, true)),
1525 &Default::default(),
1526 )
1527 .unwrap();
1528
1529 file.rewind().unwrap();
1530
1531 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1532 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1533
1534 assert_eq!(batches.len(), 4);
1535 for batch in &batches[0..3] {
1536 assert_eq!(batch.num_rows(), 2);
1537 assert_eq!(batch.num_columns(), 1);
1538 assert_eq!(batch.column(0).null_count(), 2);
1539 }
1540
1541 assert_eq!(batches[3].num_rows(), 1);
1542 assert_eq!(batches[3].num_columns(), 1);
1543 assert_eq!(batches[3].column(0).null_count(), 1);
1544 }
1545
1546 #[test]
1547 fn test_primitive_single_column_reader_test() {
1548 run_single_column_reader_tests::<BoolType, _, BoolType>(
1549 2,
1550 ConvertedType::NONE,
1551 None,
1552 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1553 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1554 );
1555 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1556 2,
1557 ConvertedType::NONE,
1558 None,
1559 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1560 &[
1561 Encoding::PLAIN,
1562 Encoding::RLE_DICTIONARY,
1563 Encoding::DELTA_BINARY_PACKED,
1564 Encoding::BYTE_STREAM_SPLIT,
1565 ],
1566 );
1567 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1568 2,
1569 ConvertedType::NONE,
1570 None,
1571 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1572 &[
1573 Encoding::PLAIN,
1574 Encoding::RLE_DICTIONARY,
1575 Encoding::DELTA_BINARY_PACKED,
1576 Encoding::BYTE_STREAM_SPLIT,
1577 ],
1578 );
1579 run_single_column_reader_tests::<FloatType, _, FloatType>(
1580 2,
1581 ConvertedType::NONE,
1582 None,
1583 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1584 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1585 );
1586 }
1587
1588 #[test]
1589 fn test_unsigned_primitive_single_column_reader_test() {
1590 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1591 2,
1592 ConvertedType::UINT_32,
1593 Some(ArrowDataType::UInt32),
1594 |vals| {
1595 Arc::new(UInt32Array::from_iter(
1596 vals.iter().map(|x| x.map(|x| x as u32)),
1597 ))
1598 },
1599 &[
1600 Encoding::PLAIN,
1601 Encoding::RLE_DICTIONARY,
1602 Encoding::DELTA_BINARY_PACKED,
1603 ],
1604 );
1605 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1606 2,
1607 ConvertedType::UINT_64,
1608 Some(ArrowDataType::UInt64),
1609 |vals| {
1610 Arc::new(UInt64Array::from_iter(
1611 vals.iter().map(|x| x.map(|x| x as u64)),
1612 ))
1613 },
1614 &[
1615 Encoding::PLAIN,
1616 Encoding::RLE_DICTIONARY,
1617 Encoding::DELTA_BINARY_PACKED,
1618 ],
1619 );
1620 }
1621
1622 #[test]
1623 fn test_unsigned_roundtrip() {
1624 let schema = Arc::new(Schema::new(vec![
1625 Field::new("uint32", ArrowDataType::UInt32, true),
1626 Field::new("uint64", ArrowDataType::UInt64, true),
1627 ]));
1628
1629 let mut buf = Vec::with_capacity(1024);
1630 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1631
1632 let original = RecordBatch::try_new(
1633 schema,
1634 vec![
1635 Arc::new(UInt32Array::from_iter_values([
1636 0,
1637 i32::MAX as u32,
1638 u32::MAX,
1639 ])),
1640 Arc::new(UInt64Array::from_iter_values([
1641 0,
1642 i64::MAX as u64,
1643 u64::MAX,
1644 ])),
1645 ],
1646 )
1647 .unwrap();
1648
1649 writer.write(&original).unwrap();
1650 writer.close().unwrap();
1651
1652 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1653 let ret = reader.next().unwrap().unwrap();
1654 assert_eq!(ret, original);
1655
1656 ret.column(0)
1658 .as_any()
1659 .downcast_ref::<UInt32Array>()
1660 .unwrap();
1661
1662 ret.column(1)
1663 .as_any()
1664 .downcast_ref::<UInt64Array>()
1665 .unwrap();
1666 }
1667
1668 #[test]
1669 fn test_float16_roundtrip() -> Result<()> {
1670 let schema = Arc::new(Schema::new(vec![
1671 Field::new("float16", ArrowDataType::Float16, false),
1672 Field::new("float16-nullable", ArrowDataType::Float16, true),
1673 ]));
1674
1675 let mut buf = Vec::with_capacity(1024);
1676 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1677
1678 let original = RecordBatch::try_new(
1679 schema,
1680 vec![
1681 Arc::new(Float16Array::from_iter_values([
1682 f16::EPSILON,
1683 f16::MIN,
1684 f16::MAX,
1685 f16::NAN,
1686 f16::INFINITY,
1687 f16::NEG_INFINITY,
1688 f16::ONE,
1689 f16::NEG_ONE,
1690 f16::ZERO,
1691 f16::NEG_ZERO,
1692 f16::E,
1693 f16::PI,
1694 f16::FRAC_1_PI,
1695 ])),
1696 Arc::new(Float16Array::from(vec![
1697 None,
1698 None,
1699 None,
1700 Some(f16::NAN),
1701 Some(f16::INFINITY),
1702 Some(f16::NEG_INFINITY),
1703 None,
1704 None,
1705 None,
1706 None,
1707 None,
1708 None,
1709 Some(f16::FRAC_1_PI),
1710 ])),
1711 ],
1712 )?;
1713
1714 writer.write(&original)?;
1715 writer.close()?;
1716
1717 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1718 let ret = reader.next().unwrap()?;
1719 assert_eq!(ret, original);
1720
1721 ret.column(0).as_primitive::<Float16Type>();
1723 ret.column(1).as_primitive::<Float16Type>();
1724
1725 Ok(())
1726 }
1727
1728 #[test]
1729 fn test_time_utc_roundtrip() -> Result<()> {
1730 let schema = Arc::new(Schema::new(vec![
1731 Field::new(
1732 "time_millis",
1733 ArrowDataType::Time32(TimeUnit::Millisecond),
1734 true,
1735 )
1736 .with_metadata(HashMap::from_iter(vec![(
1737 "adjusted_to_utc".to_string(),
1738 "".to_string(),
1739 )])),
1740 Field::new(
1741 "time_micros",
1742 ArrowDataType::Time64(TimeUnit::Microsecond),
1743 true,
1744 )
1745 .with_metadata(HashMap::from_iter(vec![(
1746 "adjusted_to_utc".to_string(),
1747 "".to_string(),
1748 )])),
1749 ]));
1750
1751 let mut buf = Vec::with_capacity(1024);
1752 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1753
1754 let original = RecordBatch::try_new(
1755 schema,
1756 vec![
1757 Arc::new(Time32MillisecondArray::from(vec![
1758 Some(-1),
1759 Some(0),
1760 Some(86_399_000),
1761 Some(86_400_000),
1762 Some(86_401_000),
1763 None,
1764 ])),
1765 Arc::new(Time64MicrosecondArray::from(vec![
1766 Some(-1),
1767 Some(0),
1768 Some(86_399 * 1_000_000),
1769 Some(86_400 * 1_000_000),
1770 Some(86_401 * 1_000_000),
1771 None,
1772 ])),
1773 ],
1774 )?;
1775
1776 writer.write(&original)?;
1777 writer.close()?;
1778
1779 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1780 let ret = reader.next().unwrap()?;
1781 assert_eq!(ret, original);
1782
1783 ret.column(0).as_primitive::<Time32MillisecondType>();
1785 ret.column(1).as_primitive::<Time64MicrosecondType>();
1786
1787 Ok(())
1788 }
1789
1790 #[test]
1791 fn test_date32_roundtrip() -> Result<()> {
1792 use arrow_array::Date32Array;
1793
1794 let schema = Arc::new(Schema::new(vec![Field::new(
1795 "date32",
1796 ArrowDataType::Date32,
1797 false,
1798 )]));
1799
1800 let mut buf = Vec::with_capacity(1024);
1801
1802 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1803
1804 let original = RecordBatch::try_new(
1805 schema,
1806 vec![Arc::new(Date32Array::from(vec![
1807 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1808 ]))],
1809 )?;
1810
1811 writer.write(&original)?;
1812 writer.close()?;
1813
1814 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1815 let ret = reader.next().unwrap()?;
1816 assert_eq!(ret, original);
1817
1818 ret.column(0).as_primitive::<Date32Type>();
1820
1821 Ok(())
1822 }
1823
1824 #[test]
1825 fn test_date64_roundtrip() -> Result<()> {
1826 use arrow_array::Date64Array;
1827
1828 let schema = Arc::new(Schema::new(vec![
1829 Field::new("small-date64", ArrowDataType::Date64, false),
1830 Field::new("big-date64", ArrowDataType::Date64, false),
1831 Field::new("invalid-date64", ArrowDataType::Date64, false),
1832 ]));
1833
1834 let mut default_buf = Vec::with_capacity(1024);
1835 let mut coerce_buf = Vec::with_capacity(1024);
1836
1837 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1838
1839 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1840 let mut coerce_writer =
1841 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1842
1843 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1844
1845 let original = RecordBatch::try_new(
1846 schema,
1847 vec![
1848 Arc::new(Date64Array::from(vec![
1850 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1851 -1_000 * NUM_MILLISECONDS_IN_DAY,
1852 0,
1853 1_000 * NUM_MILLISECONDS_IN_DAY,
1854 1_000_000 * NUM_MILLISECONDS_IN_DAY,
1855 ])),
1856 Arc::new(Date64Array::from(vec![
1858 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1859 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1860 0,
1861 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1862 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1863 ])),
1864 Arc::new(Date64Array::from(vec![
1866 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1867 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1868 1,
1869 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1870 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1871 ])),
1872 ],
1873 )?;
1874
1875 default_writer.write(&original)?;
1876 coerce_writer.write(&original)?;
1877
1878 default_writer.close()?;
1879 coerce_writer.close()?;
1880
1881 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1882 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1883
1884 let default_ret = default_reader.next().unwrap()?;
1885 let coerce_ret = coerce_reader.next().unwrap()?;
1886
1887 assert_eq!(default_ret, original);
1889
1890 assert_eq!(coerce_ret.column(0), original.column(0));
1892 assert_ne!(coerce_ret.column(1), original.column(1));
1893 assert_ne!(coerce_ret.column(2), original.column(2));
1894
1895 default_ret.column(0).as_primitive::<Date64Type>();
1897 coerce_ret.column(0).as_primitive::<Date64Type>();
1898
1899 Ok(())
1900 }
1901 struct RandFixedLenGen {}
1902
1903 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1904 fn r#gen(len: i32) -> FixedLenByteArray {
1905 let mut v = vec![0u8; len as usize];
1906 rng().fill_bytes(&mut v);
1907 ByteArray::from(v).into()
1908 }
1909 }
1910
1911 #[test]
1912 fn test_fixed_length_binary_column_reader() {
1913 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1914 20,
1915 ConvertedType::NONE,
1916 None,
1917 |vals| {
1918 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1919 for val in vals {
1920 match val {
1921 Some(b) => builder.append_value(b).unwrap(),
1922 None => builder.append_null(),
1923 }
1924 }
1925 Arc::new(builder.finish())
1926 },
1927 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1928 );
1929 }
1930
1931 #[test]
1932 fn test_interval_day_time_column_reader() {
1933 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1934 12,
1935 ConvertedType::INTERVAL,
1936 None,
1937 |vals| {
1938 Arc::new(
1939 vals.iter()
1940 .map(|x| {
1941 x.as_ref().map(|b| IntervalDayTime {
1942 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1943 milliseconds: i32::from_le_bytes(
1944 b.as_ref()[8..12].try_into().unwrap(),
1945 ),
1946 })
1947 })
1948 .collect::<IntervalDayTimeArray>(),
1949 )
1950 },
1951 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1952 );
1953 }
1954
1955 #[test]
1956 fn test_int96_single_column_reader_test() {
1957 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1958
1959 type TypeHintAndConversionFunction =
1960 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1961
1962 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1963 (None, |vals: &[Option<Int96>]| {
1965 Arc::new(TimestampNanosecondArray::from_iter(
1966 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1967 )) as ArrayRef
1968 }),
1969 (
1971 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1972 |vals: &[Option<Int96>]| {
1973 Arc::new(TimestampSecondArray::from_iter(
1974 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1975 )) as ArrayRef
1976 },
1977 ),
1978 (
1979 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1980 |vals: &[Option<Int96>]| {
1981 Arc::new(TimestampMillisecondArray::from_iter(
1982 vals.iter().map(|x| x.map(|x| x.to_millis())),
1983 )) as ArrayRef
1984 },
1985 ),
1986 (
1987 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1988 |vals: &[Option<Int96>]| {
1989 Arc::new(TimestampMicrosecondArray::from_iter(
1990 vals.iter().map(|x| x.map(|x| x.to_micros())),
1991 )) as ArrayRef
1992 },
1993 ),
1994 (
1995 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1996 |vals: &[Option<Int96>]| {
1997 Arc::new(TimestampNanosecondArray::from_iter(
1998 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1999 )) as ArrayRef
2000 },
2001 ),
2002 (
2004 Some(ArrowDataType::Timestamp(
2005 TimeUnit::Second,
2006 Some(Arc::from("-05:00")),
2007 )),
2008 |vals: &[Option<Int96>]| {
2009 Arc::new(
2010 TimestampSecondArray::from_iter(
2011 vals.iter().map(|x| x.map(|x| x.to_seconds())),
2012 )
2013 .with_timezone("-05:00"),
2014 ) as ArrayRef
2015 },
2016 ),
2017 ];
2018
2019 resolutions.iter().for_each(|(arrow_type, converter)| {
2020 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
2021 2,
2022 ConvertedType::NONE,
2023 arrow_type.clone(),
2024 converter,
2025 encodings,
2026 );
2027 })
2028 }
2029
2030 #[test]
2031 fn test_int96_from_spark_file_with_provided_schema() {
2032 use arrow_schema::DataType::Timestamp;
2036 let test_data = arrow::util::test_util::parquet_test_data();
2037 let path = format!("{test_data}/int96_from_spark.parquet");
2038 let file = File::open(path).unwrap();
2039
2040 let supplied_schema = Arc::new(Schema::new(vec![Field::new(
2041 "a",
2042 Timestamp(TimeUnit::Microsecond, None),
2043 true,
2044 )]));
2045 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
2046
2047 let mut record_reader =
2048 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
2049 .unwrap()
2050 .build()
2051 .unwrap();
2052
2053 let batch = record_reader.next().unwrap().unwrap();
2054 assert_eq!(batch.num_columns(), 1);
2055 let column = batch.column(0);
2056 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Microsecond, None));
2057
2058 let expected = Arc::new(Int64Array::from(vec![
2059 Some(1704141296123456),
2060 Some(1704070800000000),
2061 Some(253402225200000000),
2062 Some(1735599600000000),
2063 None,
2064 Some(9089380393200000000),
2065 ]));
2066
2067 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2072 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2073
2074 assert_eq!(casted_timestamps.len(), expected.len());
2075
2076 casted_timestamps
2077 .iter()
2078 .zip(expected.iter())
2079 .for_each(|(lhs, rhs)| {
2080 assert_eq!(lhs, rhs);
2081 });
2082 }
2083
2084 #[test]
2085 fn test_int96_from_spark_file_without_provided_schema() {
2086 use arrow_schema::DataType::Timestamp;
2090 let test_data = arrow::util::test_util::parquet_test_data();
2091 let path = format!("{test_data}/int96_from_spark.parquet");
2092 let file = File::open(path).unwrap();
2093
2094 let mut record_reader = ParquetRecordBatchReaderBuilder::try_new(file)
2095 .unwrap()
2096 .build()
2097 .unwrap();
2098
2099 let batch = record_reader.next().unwrap().unwrap();
2100 assert_eq!(batch.num_columns(), 1);
2101 let column = batch.column(0);
2102 assert_eq!(column.data_type(), &Timestamp(TimeUnit::Nanosecond, None));
2103
2104 let expected = Arc::new(Int64Array::from(vec![
2105 Some(1704141296123456000), Some(1704070800000000000), Some(-4852191831933722624), Some(1735599600000000000), None,
2110 Some(-4864435138808946688), ]));
2112
2113 let binding = arrow_cast::cast(batch.column(0), &arrow_schema::DataType::Int64).unwrap();
2118 let casted_timestamps = binding.as_primitive::<types::Int64Type>();
2119
2120 assert_eq!(casted_timestamps.len(), expected.len());
2121
2122 casted_timestamps
2123 .iter()
2124 .zip(expected.iter())
2125 .for_each(|(lhs, rhs)| {
2126 assert_eq!(lhs, rhs);
2127 });
2128 }
2129
2130 struct RandUtf8Gen {}
2131
2132 impl RandGen<ByteArrayType> for RandUtf8Gen {
2133 fn r#gen(len: i32) -> ByteArray {
2134 Int32Type::r#gen(len).to_string().as_str().into()
2135 }
2136 }
2137
2138 #[test]
2139 fn test_utf8_single_column_reader_test() {
2140 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
2141 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
2142 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
2143 })))
2144 }
2145
2146 let encodings = &[
2147 Encoding::PLAIN,
2148 Encoding::RLE_DICTIONARY,
2149 Encoding::DELTA_LENGTH_BYTE_ARRAY,
2150 Encoding::DELTA_BYTE_ARRAY,
2151 ];
2152
2153 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2154 2,
2155 ConvertedType::NONE,
2156 None,
2157 |vals| {
2158 Arc::new(BinaryArray::from_iter(
2159 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
2160 ))
2161 },
2162 encodings,
2163 );
2164
2165 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2166 2,
2167 ConvertedType::UTF8,
2168 None,
2169 string_converter::<i32>,
2170 encodings,
2171 );
2172
2173 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2174 2,
2175 ConvertedType::UTF8,
2176 Some(ArrowDataType::Utf8),
2177 string_converter::<i32>,
2178 encodings,
2179 );
2180
2181 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2182 2,
2183 ConvertedType::UTF8,
2184 Some(ArrowDataType::LargeUtf8),
2185 string_converter::<i64>,
2186 encodings,
2187 );
2188
2189 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
2190 for key in &small_key_types {
2191 for encoding in encodings {
2192 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
2193 opts.encoding = *encoding;
2194
2195 let data_type =
2196 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2197
2198 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
2200 opts,
2201 2,
2202 ConvertedType::UTF8,
2203 Some(data_type.clone()),
2204 move |vals| {
2205 let vals = string_converter::<i32>(vals);
2206 arrow::compute::cast(&vals, &data_type).unwrap()
2207 },
2208 );
2209 }
2210 }
2211
2212 let key_types = [
2213 ArrowDataType::Int16,
2214 ArrowDataType::UInt16,
2215 ArrowDataType::Int32,
2216 ArrowDataType::UInt32,
2217 ArrowDataType::Int64,
2218 ArrowDataType::UInt64,
2219 ];
2220
2221 for key in &key_types {
2222 let data_type =
2223 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
2224
2225 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
2226 2,
2227 ConvertedType::UTF8,
2228 Some(data_type.clone()),
2229 move |vals| {
2230 let vals = string_converter::<i32>(vals);
2231 arrow::compute::cast(&vals, &data_type).unwrap()
2232 },
2233 encodings,
2234 );
2235
2236 }
2253 }
2254
2255 #[test]
2256 fn test_decimal_nullable_struct() {
2257 let decimals = Decimal256Array::from_iter_values(
2258 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
2259 );
2260
2261 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2262 "decimals",
2263 decimals.data_type().clone(),
2264 false,
2265 )])))
2266 .len(8)
2267 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2268 .child_data(vec![decimals.into_data()])
2269 .build()
2270 .unwrap();
2271
2272 let written =
2273 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2274 .unwrap();
2275
2276 let mut buffer = Vec::with_capacity(1024);
2277 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2278 writer.write(&written).unwrap();
2279 writer.close().unwrap();
2280
2281 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2282 .unwrap()
2283 .collect::<Result<Vec<_>, _>>()
2284 .unwrap();
2285
2286 assert_eq!(&written.slice(0, 3), &read[0]);
2287 assert_eq!(&written.slice(3, 3), &read[1]);
2288 assert_eq!(&written.slice(6, 2), &read[2]);
2289 }
2290
2291 #[test]
2292 fn test_int32_nullable_struct() {
2293 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2294 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
2295 "int32",
2296 int32.data_type().clone(),
2297 false,
2298 )])))
2299 .len(8)
2300 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
2301 .child_data(vec![int32.into_data()])
2302 .build()
2303 .unwrap();
2304
2305 let written =
2306 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
2307 .unwrap();
2308
2309 let mut buffer = Vec::with_capacity(1024);
2310 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2311 writer.write(&written).unwrap();
2312 writer.close().unwrap();
2313
2314 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2315 .unwrap()
2316 .collect::<Result<Vec<_>, _>>()
2317 .unwrap();
2318
2319 assert_eq!(&written.slice(0, 3), &read[0]);
2320 assert_eq!(&written.slice(3, 3), &read[1]);
2321 assert_eq!(&written.slice(6, 2), &read[2]);
2322 }
2323
2324 #[test]
2325 fn test_decimal_list() {
2326 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
2327
2328 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
2330 decimals.data_type().clone(),
2331 false,
2332 ))))
2333 .len(7)
2334 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
2335 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
2336 .child_data(vec![decimals.into_data()])
2337 .build()
2338 .unwrap();
2339
2340 let written =
2341 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
2342 .unwrap();
2343
2344 let mut buffer = Vec::with_capacity(1024);
2345 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
2346 writer.write(&written).unwrap();
2347 writer.close().unwrap();
2348
2349 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2350 .unwrap()
2351 .collect::<Result<Vec<_>, _>>()
2352 .unwrap();
2353
2354 assert_eq!(&written.slice(0, 3), &read[0]);
2355 assert_eq!(&written.slice(3, 3), &read[1]);
2356 assert_eq!(&written.slice(6, 1), &read[2]);
2357 }
2358
2359 #[test]
2360 fn test_read_decimal_file() {
2361 use arrow_array::Decimal128Array;
2362 let testdata = arrow::util::test_util::parquet_test_data();
2363 let file_variants = vec![
2364 ("byte_array", 4),
2365 ("fixed_length", 25),
2366 ("int32", 4),
2367 ("int64", 10),
2368 ];
2369 for (prefix, target_precision) in file_variants {
2370 let path = format!("{testdata}/{prefix}_decimal.parquet");
2371 let file = File::open(path).unwrap();
2372 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2373
2374 let batch = record_reader.next().unwrap().unwrap();
2375 assert_eq!(batch.num_rows(), 24);
2376 let col = batch
2377 .column(0)
2378 .as_any()
2379 .downcast_ref::<Decimal128Array>()
2380 .unwrap();
2381
2382 let expected = 1..25;
2383
2384 assert_eq!(col.precision(), target_precision);
2385 assert_eq!(col.scale(), 2);
2386
2387 for (i, v) in expected.enumerate() {
2388 assert_eq!(col.value(i), v * 100_i128);
2389 }
2390 }
2391 }
2392
2393 #[test]
2394 fn test_read_float16_nonzeros_file() {
2395 use arrow_array::Float16Array;
2396 let testdata = arrow::util::test_util::parquet_test_data();
2397 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
2399 let file = File::open(path).unwrap();
2400 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2401
2402 let batch = record_reader.next().unwrap().unwrap();
2403 assert_eq!(batch.num_rows(), 8);
2404 let col = batch
2405 .column(0)
2406 .as_any()
2407 .downcast_ref::<Float16Array>()
2408 .unwrap();
2409
2410 let f16_two = f16::ONE + f16::ONE;
2411
2412 assert_eq!(col.null_count(), 1);
2413 assert!(col.is_null(0));
2414 assert_eq!(col.value(1), f16::ONE);
2415 assert_eq!(col.value(2), -f16_two);
2416 assert!(col.value(3).is_nan());
2417 assert_eq!(col.value(4), f16::ZERO);
2418 assert!(col.value(4).is_sign_positive());
2419 assert_eq!(col.value(5), f16::NEG_ONE);
2420 assert_eq!(col.value(6), f16::NEG_ZERO);
2421 assert!(col.value(6).is_sign_negative());
2422 assert_eq!(col.value(7), f16_two);
2423 }
2424
2425 #[test]
2426 fn test_read_float16_zeros_file() {
2427 use arrow_array::Float16Array;
2428 let testdata = arrow::util::test_util::parquet_test_data();
2429 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
2431 let file = File::open(path).unwrap();
2432 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2433
2434 let batch = record_reader.next().unwrap().unwrap();
2435 assert_eq!(batch.num_rows(), 3);
2436 let col = batch
2437 .column(0)
2438 .as_any()
2439 .downcast_ref::<Float16Array>()
2440 .unwrap();
2441
2442 assert_eq!(col.null_count(), 1);
2443 assert!(col.is_null(0));
2444 assert_eq!(col.value(1), f16::ZERO);
2445 assert!(col.value(1).is_sign_positive());
2446 assert!(col.value(2).is_nan());
2447 }
2448
2449 #[test]
2450 fn test_read_float32_float64_byte_stream_split() {
2451 let path = format!(
2452 "{}/byte_stream_split.zstd.parquet",
2453 arrow::util::test_util::parquet_test_data(),
2454 );
2455 let file = File::open(path).unwrap();
2456 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2457
2458 let mut row_count = 0;
2459 for batch in record_reader {
2460 let batch = batch.unwrap();
2461 row_count += batch.num_rows();
2462 let f32_col = batch.column(0).as_primitive::<Float32Type>();
2463 let f64_col = batch.column(1).as_primitive::<Float64Type>();
2464
2465 for &x in f32_col.values() {
2467 assert!(x > -10.0);
2468 assert!(x < 10.0);
2469 }
2470 for &x in f64_col.values() {
2471 assert!(x > -10.0);
2472 assert!(x < 10.0);
2473 }
2474 }
2475 assert_eq!(row_count, 300);
2476 }
2477
2478 #[test]
2479 fn test_read_extended_byte_stream_split() {
2480 let path = format!(
2481 "{}/byte_stream_split_extended.gzip.parquet",
2482 arrow::util::test_util::parquet_test_data(),
2483 );
2484 let file = File::open(path).unwrap();
2485 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
2486
2487 let mut row_count = 0;
2488 for batch in record_reader {
2489 let batch = batch.unwrap();
2490 row_count += batch.num_rows();
2491
2492 let f16_col = batch.column(0).as_primitive::<Float16Type>();
2494 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
2495 assert_eq!(f16_col.len(), f16_bss.len());
2496 f16_col
2497 .iter()
2498 .zip(f16_bss.iter())
2499 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2500
2501 let f32_col = batch.column(2).as_primitive::<Float32Type>();
2503 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
2504 assert_eq!(f32_col.len(), f32_bss.len());
2505 f32_col
2506 .iter()
2507 .zip(f32_bss.iter())
2508 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2509
2510 let f64_col = batch.column(4).as_primitive::<Float64Type>();
2512 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
2513 assert_eq!(f64_col.len(), f64_bss.len());
2514 f64_col
2515 .iter()
2516 .zip(f64_bss.iter())
2517 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2518
2519 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
2521 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
2522 assert_eq!(i32_col.len(), i32_bss.len());
2523 i32_col
2524 .iter()
2525 .zip(i32_bss.iter())
2526 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2527
2528 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
2530 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
2531 assert_eq!(i64_col.len(), i64_bss.len());
2532 i64_col
2533 .iter()
2534 .zip(i64_bss.iter())
2535 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2536
2537 let flba_col = batch.column(10).as_fixed_size_binary();
2539 let flba_bss = batch.column(11).as_fixed_size_binary();
2540 assert_eq!(flba_col.len(), flba_bss.len());
2541 flba_col
2542 .iter()
2543 .zip(flba_bss.iter())
2544 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2545
2546 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
2548 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
2549 assert_eq!(dec_col.len(), dec_bss.len());
2550 dec_col
2551 .iter()
2552 .zip(dec_bss.iter())
2553 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
2554 }
2555 assert_eq!(row_count, 200);
2556 }
2557
2558 #[test]
2559 fn test_read_incorrect_map_schema_file() {
2560 let testdata = arrow::util::test_util::parquet_test_data();
2561 let path = format!("{testdata}/incorrect_map_schema.parquet");
2563 let file = File::open(path).unwrap();
2564 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2565
2566 let batch = record_reader.next().unwrap().unwrap();
2567 assert_eq!(batch.num_rows(), 1);
2568
2569 let expected_schema = Schema::new(vec![Field::new(
2570 "my_map",
2571 ArrowDataType::Map(
2572 Arc::new(Field::new(
2573 "key_value",
2574 ArrowDataType::Struct(Fields::from(vec![
2575 Field::new("key", ArrowDataType::Utf8, false),
2576 Field::new("value", ArrowDataType::Utf8, true),
2577 ])),
2578 false,
2579 )),
2580 false,
2581 ),
2582 true,
2583 )]);
2584 assert_eq!(batch.schema().as_ref(), &expected_schema);
2585
2586 assert_eq!(batch.num_rows(), 1);
2587 assert_eq!(batch.column(0).null_count(), 0);
2588 assert_eq!(
2589 batch.column(0).as_map().keys().as_ref(),
2590 &StringArray::from(vec!["parent", "name"])
2591 );
2592 assert_eq!(
2593 batch.column(0).as_map().values().as_ref(),
2594 &StringArray::from(vec!["another", "report"])
2595 );
2596 }
2597
2598 #[test]
2599 fn test_read_dict_fixed_size_binary() {
2600 let schema = Arc::new(Schema::new(vec![Field::new(
2601 "a",
2602 ArrowDataType::Dictionary(
2603 Box::new(ArrowDataType::UInt8),
2604 Box::new(ArrowDataType::FixedSizeBinary(8)),
2605 ),
2606 true,
2607 )]));
2608 let keys = UInt8Array::from_iter_values(vec![0, 0, 1]);
2609 let values = FixedSizeBinaryArray::try_from_iter(
2610 vec![
2611 (0u8..8u8).collect::<Vec<u8>>(),
2612 (24u8..32u8).collect::<Vec<u8>>(),
2613 ]
2614 .into_iter(),
2615 )
2616 .unwrap();
2617 let arr = UInt8DictionaryArray::new(keys, Arc::new(values));
2618 let batch = RecordBatch::try_new(schema, vec![Arc::new(arr)]).unwrap();
2619
2620 let mut buffer = Vec::with_capacity(1024);
2621 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2622 writer.write(&batch).unwrap();
2623 writer.close().unwrap();
2624 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
2625 .unwrap()
2626 .collect::<Result<Vec<_>, _>>()
2627 .unwrap();
2628
2629 assert_eq!(read.len(), 1);
2630 assert_eq!(&batch, &read[0])
2631 }
2632
2633 #[test]
2634 fn test_read_nullable_structs_with_binary_dict_as_first_child_column() {
2635 let struct_fields = Fields::from(vec![
2642 Field::new(
2643 "city",
2644 ArrowDataType::Dictionary(
2645 Box::new(ArrowDataType::UInt8),
2646 Box::new(ArrowDataType::Utf8),
2647 ),
2648 true,
2649 ),
2650 Field::new("name", ArrowDataType::Utf8, true),
2651 ]);
2652 let schema = Arc::new(Schema::new(vec![Field::new(
2653 "items",
2654 ArrowDataType::Struct(struct_fields.clone()),
2655 true,
2656 )]));
2657
2658 let items_arr = StructArray::new(
2659 struct_fields,
2660 vec![
2661 Arc::new(DictionaryArray::new(
2662 UInt8Array::from_iter_values(vec![0, 1, 1, 0, 2]),
2663 Arc::new(StringArray::from_iter_values(vec![
2664 "quebec",
2665 "fredericton",
2666 "halifax",
2667 ])),
2668 )),
2669 Arc::new(StringArray::from_iter_values(vec![
2670 "albert", "terry", "lance", "", "tim",
2671 ])),
2672 ],
2673 Some(NullBuffer::from_iter(vec![true, true, true, false, true])),
2674 );
2675
2676 let batch = RecordBatch::try_new(schema, vec![Arc::new(items_arr)]).unwrap();
2677 let mut buffer = Vec::with_capacity(1024);
2678 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
2679 writer.write(&batch).unwrap();
2680 writer.close().unwrap();
2681 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
2682 .unwrap()
2683 .collect::<Result<Vec<_>, _>>()
2684 .unwrap();
2685
2686 assert_eq!(read.len(), 1);
2687 assert_eq!(&batch, &read[0])
2688 }
2689
2690 #[derive(Clone)]
2692 struct TestOptions {
2693 num_row_groups: usize,
2696 num_rows: usize,
2698 record_batch_size: usize,
2700 null_percent: Option<usize>,
2702 write_batch_size: usize,
2707 max_data_page_size: usize,
2709 max_dict_page_size: usize,
2711 writer_version: WriterVersion,
2713 enabled_statistics: EnabledStatistics,
2715 encoding: Encoding,
2717 row_selections: Option<(RowSelection, usize)>,
2719 row_filter: Option<Vec<bool>>,
2721 limit: Option<usize>,
2723 offset: Option<usize>,
2725 }
2726
2727 impl std::fmt::Debug for TestOptions {
2729 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2730 f.debug_struct("TestOptions")
2731 .field("num_row_groups", &self.num_row_groups)
2732 .field("num_rows", &self.num_rows)
2733 .field("record_batch_size", &self.record_batch_size)
2734 .field("null_percent", &self.null_percent)
2735 .field("write_batch_size", &self.write_batch_size)
2736 .field("max_data_page_size", &self.max_data_page_size)
2737 .field("max_dict_page_size", &self.max_dict_page_size)
2738 .field("writer_version", &self.writer_version)
2739 .field("enabled_statistics", &self.enabled_statistics)
2740 .field("encoding", &self.encoding)
2741 .field("row_selections", &self.row_selections.is_some())
2742 .field("row_filter", &self.row_filter.is_some())
2743 .field("limit", &self.limit)
2744 .field("offset", &self.offset)
2745 .finish()
2746 }
2747 }
2748
2749 impl Default for TestOptions {
2750 fn default() -> Self {
2751 Self {
2752 num_row_groups: 2,
2753 num_rows: 100,
2754 record_batch_size: 15,
2755 null_percent: None,
2756 write_batch_size: 64,
2757 max_data_page_size: 1024 * 1024,
2758 max_dict_page_size: 1024 * 1024,
2759 writer_version: WriterVersion::PARQUET_1_0,
2760 enabled_statistics: EnabledStatistics::Page,
2761 encoding: Encoding::PLAIN,
2762 row_selections: None,
2763 row_filter: None,
2764 limit: None,
2765 offset: None,
2766 }
2767 }
2768 }
2769
2770 impl TestOptions {
2771 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2772 Self {
2773 num_row_groups,
2774 num_rows,
2775 record_batch_size,
2776 ..Default::default()
2777 }
2778 }
2779
2780 fn with_null_percent(self, null_percent: usize) -> Self {
2781 Self {
2782 null_percent: Some(null_percent),
2783 ..self
2784 }
2785 }
2786
2787 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2788 Self {
2789 max_data_page_size,
2790 ..self
2791 }
2792 }
2793
2794 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2795 Self {
2796 max_dict_page_size,
2797 ..self
2798 }
2799 }
2800
2801 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2802 Self {
2803 enabled_statistics,
2804 ..self
2805 }
2806 }
2807
2808 fn with_row_selections(self) -> Self {
2809 assert!(self.row_filter.is_none(), "Must set row selection first");
2810
2811 let mut rng = rng();
2812 let step = rng.random_range(self.record_batch_size..self.num_rows);
2813 let row_selections = create_test_selection(
2814 step,
2815 self.num_row_groups * self.num_rows,
2816 rng.random::<bool>(),
2817 );
2818 Self {
2819 row_selections: Some(row_selections),
2820 ..self
2821 }
2822 }
2823
2824 fn with_row_filter(self) -> Self {
2825 let row_count = match &self.row_selections {
2826 Some((_, count)) => *count,
2827 None => self.num_row_groups * self.num_rows,
2828 };
2829
2830 let mut rng = rng();
2831 Self {
2832 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2833 ..self
2834 }
2835 }
2836
2837 fn with_limit(self, limit: usize) -> Self {
2838 Self {
2839 limit: Some(limit),
2840 ..self
2841 }
2842 }
2843
2844 fn with_offset(self, offset: usize) -> Self {
2845 Self {
2846 offset: Some(offset),
2847 ..self
2848 }
2849 }
2850
2851 fn writer_props(&self) -> WriterProperties {
2852 let builder = WriterProperties::builder()
2853 .set_data_page_size_limit(self.max_data_page_size)
2854 .set_write_batch_size(self.write_batch_size)
2855 .set_writer_version(self.writer_version)
2856 .set_statistics_enabled(self.enabled_statistics);
2857
2858 let builder = match self.encoding {
2859 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2860 .set_dictionary_enabled(true)
2861 .set_dictionary_page_size_limit(self.max_dict_page_size),
2862 _ => builder
2863 .set_dictionary_enabled(false)
2864 .set_encoding(self.encoding),
2865 };
2866
2867 builder.build()
2868 }
2869 }
2870
2871 fn run_single_column_reader_tests<T, F, G>(
2878 rand_max: i32,
2879 converted_type: ConvertedType,
2880 arrow_type: Option<ArrowDataType>,
2881 converter: F,
2882 encodings: &[Encoding],
2883 ) where
2884 T: DataType,
2885 G: RandGen<T>,
2886 F: Fn(&[Option<T::T>]) -> ArrayRef,
2887 {
2888 let all_options = vec![
2889 TestOptions::new(2, 100, 15),
2892 TestOptions::new(3, 25, 5),
2897 TestOptions::new(4, 100, 25),
2901 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2903 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2905 TestOptions::new(2, 256, 127).with_null_percent(0),
2907 TestOptions::new(2, 256, 93).with_null_percent(25),
2909 TestOptions::new(4, 100, 25).with_limit(0),
2911 TestOptions::new(4, 100, 25).with_limit(50),
2913 TestOptions::new(4, 100, 25).with_limit(10),
2915 TestOptions::new(4, 100, 25).with_limit(101),
2917 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2919 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2921 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2923 TestOptions::new(2, 256, 91)
2925 .with_null_percent(25)
2926 .with_enabled_statistics(EnabledStatistics::Chunk),
2927 TestOptions::new(2, 256, 91)
2929 .with_null_percent(25)
2930 .with_enabled_statistics(EnabledStatistics::None),
2931 TestOptions::new(2, 128, 91)
2933 .with_null_percent(100)
2934 .with_enabled_statistics(EnabledStatistics::None),
2935 TestOptions::new(2, 100, 15).with_row_selections(),
2940 TestOptions::new(3, 25, 5).with_row_selections(),
2945 TestOptions::new(4, 100, 25).with_row_selections(),
2949 TestOptions::new(3, 256, 73)
2951 .with_max_data_page_size(128)
2952 .with_row_selections(),
2953 TestOptions::new(3, 256, 57)
2955 .with_max_dict_page_size(128)
2956 .with_row_selections(),
2957 TestOptions::new(2, 256, 127)
2959 .with_null_percent(0)
2960 .with_row_selections(),
2961 TestOptions::new(2, 256, 93)
2963 .with_null_percent(25)
2964 .with_row_selections(),
2965 TestOptions::new(2, 256, 93)
2967 .with_null_percent(25)
2968 .with_row_selections()
2969 .with_limit(10),
2970 TestOptions::new(2, 256, 93)
2972 .with_null_percent(25)
2973 .with_row_selections()
2974 .with_offset(20)
2975 .with_limit(10),
2976 TestOptions::new(4, 100, 25).with_row_filter(),
2980 TestOptions::new(4, 100, 25)
2982 .with_row_selections()
2983 .with_row_filter(),
2984 TestOptions::new(2, 256, 93)
2986 .with_null_percent(25)
2987 .with_max_data_page_size(10)
2988 .with_row_filter(),
2989 TestOptions::new(2, 256, 93)
2991 .with_null_percent(25)
2992 .with_max_data_page_size(10)
2993 .with_row_selections()
2994 .with_row_filter(),
2995 TestOptions::new(2, 256, 93)
2997 .with_enabled_statistics(EnabledStatistics::None)
2998 .with_max_data_page_size(10)
2999 .with_row_selections(),
3000 ];
3001
3002 all_options.into_iter().for_each(|opts| {
3003 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
3004 for encoding in encodings {
3005 let opts = TestOptions {
3006 writer_version,
3007 encoding: *encoding,
3008 ..opts.clone()
3009 };
3010
3011 single_column_reader_test::<T, _, G>(
3012 opts,
3013 rand_max,
3014 converted_type,
3015 arrow_type.clone(),
3016 &converter,
3017 )
3018 }
3019 }
3020 });
3021 }
3022
3023 fn single_column_reader_test<T, F, G>(
3027 opts: TestOptions,
3028 rand_max: i32,
3029 converted_type: ConvertedType,
3030 arrow_type: Option<ArrowDataType>,
3031 converter: F,
3032 ) where
3033 T: DataType,
3034 G: RandGen<T>,
3035 F: Fn(&[Option<T::T>]) -> ArrayRef,
3036 {
3037 println!(
3039 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
3040 T::get_physical_type(),
3041 converted_type,
3042 arrow_type,
3043 opts
3044 );
3045
3046 let (repetition, def_levels) = match opts.null_percent.as_ref() {
3048 Some(null_percent) => {
3049 let mut rng = rng();
3050
3051 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
3052 .map(|_| {
3053 std::iter::from_fn(|| {
3054 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
3055 })
3056 .take(opts.num_rows)
3057 .collect()
3058 })
3059 .collect();
3060 (Repetition::OPTIONAL, Some(def_levels))
3061 }
3062 None => (Repetition::REQUIRED, None),
3063 };
3064
3065 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
3067 .map(|idx| {
3068 let null_count = match def_levels.as_ref() {
3069 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
3070 None => 0,
3071 };
3072 G::gen_vec(rand_max, opts.num_rows - null_count)
3073 })
3074 .collect();
3075
3076 let len = match T::get_physical_type() {
3077 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
3078 crate::basic::Type::INT96 => 12,
3079 _ => -1,
3080 };
3081
3082 let fields = vec![Arc::new(
3083 Type::primitive_type_builder("leaf", T::get_physical_type())
3084 .with_repetition(repetition)
3085 .with_converted_type(converted_type)
3086 .with_length(len)
3087 .build()
3088 .unwrap(),
3089 )];
3090
3091 let schema = Arc::new(
3092 Type::group_type_builder("test_schema")
3093 .with_fields(fields)
3094 .build()
3095 .unwrap(),
3096 );
3097
3098 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
3099
3100 let mut file = tempfile::tempfile().unwrap();
3101
3102 generate_single_column_file_with_data::<T>(
3103 &values,
3104 def_levels.as_ref(),
3105 file.try_clone().unwrap(), schema,
3107 arrow_field,
3108 &opts,
3109 )
3110 .unwrap();
3111
3112 file.rewind().unwrap();
3113
3114 let options = ArrowReaderOptions::new()
3115 .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
3116
3117 let mut builder =
3118 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3119
3120 let expected_data = match opts.row_selections {
3121 Some((selections, row_count)) => {
3122 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3123
3124 let mut skip_data: Vec<Option<T::T>> = vec![];
3125 let dequeue: VecDeque<RowSelector> = selections.clone().into();
3126 for select in dequeue {
3127 if select.skip {
3128 without_skip_data.drain(0..select.row_count);
3129 } else {
3130 skip_data.extend(without_skip_data.drain(0..select.row_count));
3131 }
3132 }
3133 builder = builder.with_row_selection(selections);
3134
3135 assert_eq!(skip_data.len(), row_count);
3136 skip_data
3137 }
3138 None => {
3139 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
3141 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
3142 expected_data
3143 }
3144 };
3145
3146 let mut expected_data = match opts.row_filter {
3147 Some(filter) => {
3148 let expected_data = expected_data
3149 .into_iter()
3150 .zip(filter.iter())
3151 .filter_map(|(d, f)| f.then(|| d))
3152 .collect();
3153
3154 let mut filter_offset = 0;
3155 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
3156 ProjectionMask::all(),
3157 move |b| {
3158 let array = BooleanArray::from_iter(
3159 filter
3160 .iter()
3161 .skip(filter_offset)
3162 .take(b.num_rows())
3163 .map(|x| Some(*x)),
3164 );
3165 filter_offset += b.num_rows();
3166 Ok(array)
3167 },
3168 ))]);
3169
3170 builder = builder.with_row_filter(filter);
3171 expected_data
3172 }
3173 None => expected_data,
3174 };
3175
3176 if let Some(offset) = opts.offset {
3177 builder = builder.with_offset(offset);
3178 expected_data = expected_data.into_iter().skip(offset).collect();
3179 }
3180
3181 if let Some(limit) = opts.limit {
3182 builder = builder.with_limit(limit);
3183 expected_data = expected_data.into_iter().take(limit).collect();
3184 }
3185
3186 let mut record_reader = builder
3187 .with_batch_size(opts.record_batch_size)
3188 .build()
3189 .unwrap();
3190
3191 let mut total_read = 0;
3192 loop {
3193 let maybe_batch = record_reader.next();
3194 if total_read < expected_data.len() {
3195 let end = min(total_read + opts.record_batch_size, expected_data.len());
3196 let batch = maybe_batch.unwrap().unwrap();
3197 assert_eq!(end - total_read, batch.num_rows());
3198
3199 let a = converter(&expected_data[total_read..end]);
3200 let b = batch.column(0);
3201
3202 assert_eq!(a.data_type(), b.data_type());
3203 assert_eq!(a.to_data(), b.to_data());
3204 assert_eq!(
3205 a.as_any().type_id(),
3206 b.as_any().type_id(),
3207 "incorrect type ids"
3208 );
3209
3210 total_read = end;
3211 } else {
3212 assert!(maybe_batch.is_none());
3213 break;
3214 }
3215 }
3216 }
3217
3218 fn gen_expected_data<T: DataType>(
3219 def_levels: Option<&Vec<Vec<i16>>>,
3220 values: &[Vec<T::T>],
3221 ) -> Vec<Option<T::T>> {
3222 let data: Vec<Option<T::T>> = match def_levels {
3223 Some(levels) => {
3224 let mut values_iter = values.iter().flatten();
3225 levels
3226 .iter()
3227 .flatten()
3228 .map(|d| match d {
3229 1 => Some(values_iter.next().cloned().unwrap()),
3230 0 => None,
3231 _ => unreachable!(),
3232 })
3233 .collect()
3234 }
3235 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
3236 };
3237 data
3238 }
3239
3240 fn generate_single_column_file_with_data<T: DataType>(
3241 values: &[Vec<T::T>],
3242 def_levels: Option<&Vec<Vec<i16>>>,
3243 file: File,
3244 schema: TypePtr,
3245 field: Option<Field>,
3246 opts: &TestOptions,
3247 ) -> Result<ParquetMetaData> {
3248 let mut writer_props = opts.writer_props();
3249 if let Some(field) = field {
3250 let arrow_schema = Schema::new(vec![field]);
3251 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
3252 }
3253
3254 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
3255
3256 for (idx, v) in values.iter().enumerate() {
3257 let def_levels = def_levels.map(|d| d[idx].as_slice());
3258 let mut row_group_writer = writer.next_row_group()?;
3259 {
3260 let mut column_writer = row_group_writer
3261 .next_column()?
3262 .expect("Column writer is none!");
3263
3264 column_writer
3265 .typed::<T>()
3266 .write_batch(v, def_levels, None)?;
3267
3268 column_writer.close()?;
3269 }
3270 row_group_writer.close()?;
3271 }
3272
3273 writer.close()
3274 }
3275
3276 fn get_test_file(file_name: &str) -> File {
3277 let mut path = PathBuf::new();
3278 path.push(arrow::util::test_util::arrow_test_data());
3279 path.push(file_name);
3280
3281 File::open(path.as_path()).expect("File not found!")
3282 }
3283
3284 #[test]
3285 fn test_read_structs() {
3286 let testdata = arrow::util::test_util::parquet_test_data();
3290 let path = format!("{testdata}/nested_structs.rust.parquet");
3291 let file = File::open(&path).unwrap();
3292 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3293
3294 for batch in record_batch_reader {
3295 batch.unwrap();
3296 }
3297
3298 let file = File::open(&path).unwrap();
3299 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3300
3301 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
3302 let projected_reader = builder
3303 .with_projection(mask)
3304 .with_batch_size(60)
3305 .build()
3306 .unwrap();
3307
3308 let expected_schema = Schema::new(vec![
3309 Field::new(
3310 "roll_num",
3311 ArrowDataType::Struct(Fields::from(vec![Field::new(
3312 "count",
3313 ArrowDataType::UInt64,
3314 false,
3315 )])),
3316 false,
3317 ),
3318 Field::new(
3319 "PC_CUR",
3320 ArrowDataType::Struct(Fields::from(vec![
3321 Field::new("mean", ArrowDataType::Int64, false),
3322 Field::new("sum", ArrowDataType::Int64, false),
3323 ])),
3324 false,
3325 ),
3326 ]);
3327
3328 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3330
3331 for batch in projected_reader {
3332 let batch = batch.unwrap();
3333 assert_eq!(batch.schema().as_ref(), &expected_schema);
3334 }
3335 }
3336
3337 #[test]
3338 fn test_read_structs_by_name() {
3340 let testdata = arrow::util::test_util::parquet_test_data();
3341 let path = format!("{testdata}/nested_structs.rust.parquet");
3342 let file = File::open(&path).unwrap();
3343 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3344
3345 for batch in record_batch_reader {
3346 batch.unwrap();
3347 }
3348
3349 let file = File::open(&path).unwrap();
3350 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3351
3352 let mask = ProjectionMask::columns(
3353 builder.parquet_schema(),
3354 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
3355 );
3356 let projected_reader = builder
3357 .with_projection(mask)
3358 .with_batch_size(60)
3359 .build()
3360 .unwrap();
3361
3362 let expected_schema = Schema::new(vec![
3363 Field::new(
3364 "roll_num",
3365 ArrowDataType::Struct(Fields::from(vec![Field::new(
3366 "count",
3367 ArrowDataType::UInt64,
3368 false,
3369 )])),
3370 false,
3371 ),
3372 Field::new(
3373 "PC_CUR",
3374 ArrowDataType::Struct(Fields::from(vec![
3375 Field::new("mean", ArrowDataType::Int64, false),
3376 Field::new("sum", ArrowDataType::Int64, false),
3377 ])),
3378 false,
3379 ),
3380 ]);
3381
3382 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
3383
3384 for batch in projected_reader {
3385 let batch = batch.unwrap();
3386 assert_eq!(batch.schema().as_ref(), &expected_schema);
3387 }
3388 }
3389
3390 #[test]
3391 fn test_read_maps() {
3392 let testdata = arrow::util::test_util::parquet_test_data();
3393 let path = format!("{testdata}/nested_maps.snappy.parquet");
3394 let file = File::open(path).unwrap();
3395 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3396
3397 for batch in record_batch_reader {
3398 batch.unwrap();
3399 }
3400 }
3401
3402 #[test]
3403 fn test_nested_nullability() {
3404 let message_type = "message nested {
3405 OPTIONAL Group group {
3406 REQUIRED INT32 leaf;
3407 }
3408 }";
3409
3410 let file = tempfile::tempfile().unwrap();
3411 let schema = Arc::new(parse_message_type(message_type).unwrap());
3412
3413 {
3414 let mut writer =
3416 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
3417 .unwrap();
3418
3419 {
3420 let mut row_group_writer = writer.next_row_group().unwrap();
3421 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
3422
3423 column_writer
3424 .typed::<Int32Type>()
3425 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
3426 .unwrap();
3427
3428 column_writer.close().unwrap();
3429 row_group_writer.close().unwrap();
3430 }
3431
3432 writer.close().unwrap();
3433 }
3434
3435 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3436 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
3437
3438 let reader = builder.with_projection(mask).build().unwrap();
3439
3440 let expected_schema = Schema::new(vec![Field::new(
3441 "group",
3442 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
3443 true,
3444 )]);
3445
3446 let batch = reader.into_iter().next().unwrap().unwrap();
3447 assert_eq!(batch.schema().as_ref(), &expected_schema);
3448 assert_eq!(batch.num_rows(), 4);
3449 assert_eq!(batch.column(0).null_count(), 2);
3450 }
3451
3452 #[test]
3453 fn test_invalid_utf8() {
3454 let data = vec![
3456 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
3457 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
3458 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
3459 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
3460 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
3461 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
3462 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
3463 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
3464 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
3465 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
3466 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
3467 82, 49,
3468 ];
3469
3470 let file = Bytes::from(data);
3471 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
3472
3473 let error = record_batch_reader.next().unwrap().unwrap_err();
3474
3475 assert!(
3476 error.to_string().contains("invalid utf-8 sequence"),
3477 "{}",
3478 error
3479 );
3480 }
3481
3482 #[test]
3483 fn test_invalid_utf8_string_array() {
3484 test_invalid_utf8_string_array_inner::<i32>();
3485 }
3486
3487 #[test]
3488 fn test_invalid_utf8_large_string_array() {
3489 test_invalid_utf8_string_array_inner::<i64>();
3490 }
3491
3492 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
3493 let cases = [
3494 invalid_utf8_first_char::<O>(),
3495 invalid_utf8_first_char_long_strings::<O>(),
3496 invalid_utf8_later_char::<O>(),
3497 invalid_utf8_later_char_long_strings::<O>(),
3498 invalid_utf8_later_char_really_long_strings::<O>(),
3499 invalid_utf8_later_char_really_long_strings2::<O>(),
3500 ];
3501 for array in &cases {
3502 for encoding in STRING_ENCODINGS {
3503 let array = unsafe {
3506 GenericStringArray::<O>::new_unchecked(
3507 array.offsets().clone(),
3508 array.values().clone(),
3509 array.nulls().cloned(),
3510 )
3511 };
3512 let data_type = array.data_type().clone();
3513 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3514 let err = read_from_parquet(data).unwrap_err();
3515 let expected_err =
3516 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3517 assert!(
3518 err.to_string().contains(expected_err),
3519 "data type: {data_type}, expected: {expected_err}, got: {err}"
3520 );
3521 }
3522 }
3523 }
3524
3525 #[test]
3526 fn test_invalid_utf8_string_view_array() {
3527 let cases = [
3528 invalid_utf8_first_char::<i32>(),
3529 invalid_utf8_first_char_long_strings::<i32>(),
3530 invalid_utf8_later_char::<i32>(),
3531 invalid_utf8_later_char_long_strings::<i32>(),
3532 invalid_utf8_later_char_really_long_strings::<i32>(),
3533 invalid_utf8_later_char_really_long_strings2::<i32>(),
3534 ];
3535
3536 for encoding in STRING_ENCODINGS {
3537 for array in &cases {
3538 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
3539 let array = array.as_binary_view();
3540
3541 let array = unsafe {
3544 StringViewArray::new_unchecked(
3545 array.views().clone(),
3546 array.data_buffers().to_vec(),
3547 array.nulls().cloned(),
3548 )
3549 };
3550
3551 let data_type = array.data_type().clone();
3552 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
3553 let err = read_from_parquet(data).unwrap_err();
3554 let expected_err =
3555 "Parquet argument error: Parquet error: encountered non UTF-8 data";
3556 assert!(
3557 err.to_string().contains(expected_err),
3558 "data type: {data_type}, expected: {expected_err}, got: {err}"
3559 );
3560 }
3561 }
3562 }
3563
3564 const STRING_ENCODINGS: &[Option<Encoding>] = &[
3566 None,
3567 Some(Encoding::PLAIN),
3568 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
3569 Some(Encoding::DELTA_BYTE_ARRAY),
3570 ];
3571
3572 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
3575
3576 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
3579
3580 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3582 let valid: &[u8] = b" ";
3583 let invalid = INVALID_UTF8_FIRST_CHAR;
3584 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3585 }
3586
3587 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3591 let valid: &[u8] = b" ";
3592 let mut invalid = vec![];
3593 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3594 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
3595 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3596 }
3597
3598 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3601 let valid: &[u8] = b" ";
3602 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
3603 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
3604 }
3605
3606 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3610 let valid: &[u8] = b" ";
3611 let mut invalid = vec![];
3612 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3613 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3614 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3615 }
3616
3617 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3621 let valid: &[u8] = b" ";
3622 let mut invalid = vec![];
3623 for _ in 0..10 {
3624 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3626 }
3627 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
3628 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
3629 }
3630
3631 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
3634 let valid: &[u8] = b" ";
3635 let mut valid_long = vec![];
3636 for _ in 0..10 {
3637 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
3639 }
3640 let invalid = INVALID_UTF8_LATER_CHAR;
3641 GenericBinaryArray::<O>::from_iter(vec![
3642 None,
3643 Some(valid),
3644 Some(invalid),
3645 None,
3646 Some(&valid_long),
3647 Some(valid),
3648 ])
3649 }
3650
3651 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
3656 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
3657 let mut data = vec![];
3658 let schema = batch.schema();
3659 let props = encoding.map(|encoding| {
3660 WriterProperties::builder()
3661 .set_dictionary_enabled(false)
3663 .set_encoding(encoding)
3664 .build()
3665 });
3666
3667 {
3668 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3669 writer.write(&batch).unwrap();
3670 writer.flush().unwrap();
3671 writer.close().unwrap();
3672 };
3673 data
3674 }
3675
3676 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3678 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3679 .unwrap()
3680 .build()
3681 .unwrap();
3682
3683 reader.collect()
3684 }
3685
3686 #[test]
3687 fn test_dictionary_preservation() {
3688 let fields = vec![Arc::new(
3689 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3690 .with_repetition(Repetition::OPTIONAL)
3691 .with_converted_type(ConvertedType::UTF8)
3692 .build()
3693 .unwrap(),
3694 )];
3695
3696 let schema = Arc::new(
3697 Type::group_type_builder("test_schema")
3698 .with_fields(fields)
3699 .build()
3700 .unwrap(),
3701 );
3702
3703 let dict_type = ArrowDataType::Dictionary(
3704 Box::new(ArrowDataType::Int32),
3705 Box::new(ArrowDataType::Utf8),
3706 );
3707
3708 let arrow_field = Field::new("leaf", dict_type, true);
3709
3710 let mut file = tempfile::tempfile().unwrap();
3711
3712 let values = vec![
3713 vec![
3714 ByteArray::from("hello"),
3715 ByteArray::from("a"),
3716 ByteArray::from("b"),
3717 ByteArray::from("d"),
3718 ],
3719 vec![
3720 ByteArray::from("c"),
3721 ByteArray::from("a"),
3722 ByteArray::from("b"),
3723 ],
3724 ];
3725
3726 let def_levels = vec![
3727 vec![1, 0, 0, 1, 0, 0, 1, 1],
3728 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3729 ];
3730
3731 let opts = TestOptions {
3732 encoding: Encoding::RLE_DICTIONARY,
3733 ..Default::default()
3734 };
3735
3736 generate_single_column_file_with_data::<ByteArrayType>(
3737 &values,
3738 Some(&def_levels),
3739 file.try_clone().unwrap(), schema,
3741 Some(arrow_field),
3742 &opts,
3743 )
3744 .unwrap();
3745
3746 file.rewind().unwrap();
3747
3748 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3749
3750 let batches = record_reader
3751 .collect::<Result<Vec<RecordBatch>, _>>()
3752 .unwrap();
3753
3754 assert_eq!(batches.len(), 6);
3755 assert!(batches.iter().all(|x| x.num_columns() == 1));
3756
3757 let row_counts = batches
3758 .iter()
3759 .map(|x| (x.num_rows(), x.column(0).null_count()))
3760 .collect::<Vec<_>>();
3761
3762 assert_eq!(
3763 row_counts,
3764 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3765 );
3766
3767 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3768
3769 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3771 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3773 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3774 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3776 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3777 }
3778
3779 #[test]
3780 fn test_read_null_list() {
3781 let testdata = arrow::util::test_util::parquet_test_data();
3782 let path = format!("{testdata}/null_list.parquet");
3783 let file = File::open(path).unwrap();
3784 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3785
3786 let batch = record_batch_reader.next().unwrap().unwrap();
3787 assert_eq!(batch.num_rows(), 1);
3788 assert_eq!(batch.num_columns(), 1);
3789 assert_eq!(batch.column(0).len(), 1);
3790
3791 let list = batch
3792 .column(0)
3793 .as_any()
3794 .downcast_ref::<ListArray>()
3795 .unwrap();
3796 assert_eq!(list.len(), 1);
3797 assert!(list.is_valid(0));
3798
3799 let val = list.value(0);
3800 assert_eq!(val.len(), 0);
3801 }
3802
3803 #[test]
3804 fn test_null_schema_inference() {
3805 let testdata = arrow::util::test_util::parquet_test_data();
3806 let path = format!("{testdata}/null_list.parquet");
3807 let file = File::open(path).unwrap();
3808
3809 let arrow_field = Field::new(
3810 "emptylist",
3811 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3812 true,
3813 );
3814
3815 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3816 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3817 let schema = builder.schema();
3818 assert_eq!(schema.fields().len(), 1);
3819 assert_eq!(schema.field(0), &arrow_field);
3820 }
3821
3822 #[test]
3823 fn test_skip_metadata() {
3824 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3825 let field = Field::new("col", col.data_type().clone(), true);
3826
3827 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3828
3829 let metadata = [("key".to_string(), "value".to_string())]
3830 .into_iter()
3831 .collect();
3832
3833 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3834
3835 assert_ne!(schema_with_metadata, schema_without_metadata);
3836
3837 let batch =
3838 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3839
3840 let file = |version: WriterVersion| {
3841 let props = WriterProperties::builder()
3842 .set_writer_version(version)
3843 .build();
3844
3845 let file = tempfile().unwrap();
3846 let mut writer =
3847 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3848 .unwrap();
3849 writer.write(&batch).unwrap();
3850 writer.close().unwrap();
3851 file
3852 };
3853
3854 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3855
3856 let v1_reader = file(WriterVersion::PARQUET_1_0);
3857 let v2_reader = file(WriterVersion::PARQUET_2_0);
3858
3859 let arrow_reader =
3860 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3861 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3862
3863 let reader =
3864 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3865 .unwrap()
3866 .build()
3867 .unwrap();
3868 assert_eq!(reader.schema(), schema_without_metadata);
3869
3870 let arrow_reader =
3871 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3872 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3873
3874 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3875 .unwrap()
3876 .build()
3877 .unwrap();
3878 assert_eq!(reader.schema(), schema_without_metadata);
3879 }
3880
3881 fn write_parquet_from_iter<I, F>(value: I) -> File
3882 where
3883 I: IntoIterator<Item = (F, ArrayRef)>,
3884 F: AsRef<str>,
3885 {
3886 let batch = RecordBatch::try_from_iter(value).unwrap();
3887 let file = tempfile().unwrap();
3888 let mut writer =
3889 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3890 writer.write(&batch).unwrap();
3891 writer.close().unwrap();
3892 file
3893 }
3894
3895 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3896 where
3897 I: IntoIterator<Item = (F, ArrayRef)>,
3898 F: AsRef<str>,
3899 {
3900 let file = write_parquet_from_iter(value);
3901 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3902 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3903 file.try_clone().unwrap(),
3904 options_with_schema,
3905 );
3906 assert_eq!(builder.err().unwrap().to_string(), expected_error);
3907 }
3908
3909 #[test]
3910 fn test_schema_too_few_columns() {
3911 run_schema_test_with_error(
3912 vec![
3913 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3914 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3915 ],
3916 Arc::new(Schema::new(vec![Field::new(
3917 "int64",
3918 ArrowDataType::Int64,
3919 false,
3920 )])),
3921 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3922 );
3923 }
3924
3925 #[test]
3926 fn test_schema_too_many_columns() {
3927 run_schema_test_with_error(
3928 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3929 Arc::new(Schema::new(vec![
3930 Field::new("int64", ArrowDataType::Int64, false),
3931 Field::new("int32", ArrowDataType::Int32, false),
3932 ])),
3933 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3934 );
3935 }
3936
3937 #[test]
3938 fn test_schema_mismatched_column_names() {
3939 run_schema_test_with_error(
3940 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3941 Arc::new(Schema::new(vec![Field::new(
3942 "other",
3943 ArrowDataType::Int64,
3944 false,
3945 )])),
3946 "Arrow: incompatible arrow schema, expected field named int64 got other",
3947 );
3948 }
3949
3950 #[test]
3951 fn test_schema_incompatible_columns() {
3952 run_schema_test_with_error(
3953 vec![
3954 (
3955 "col1_invalid",
3956 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3957 ),
3958 (
3959 "col2_valid",
3960 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3961 ),
3962 (
3963 "col3_invalid",
3964 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3965 ),
3966 ],
3967 Arc::new(Schema::new(vec![
3968 Field::new("col1_invalid", ArrowDataType::Int32, false),
3969 Field::new("col2_valid", ArrowDataType::Int32, false),
3970 Field::new("col3_invalid", ArrowDataType::Int32, false),
3971 ])),
3972 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field col1_invalid: requested Int32 but found Int64, data type mismatch for field col3_invalid: requested Int32 but found Int64",
3973 );
3974 }
3975
3976 #[test]
3977 fn test_one_incompatible_nested_column() {
3978 let nested_fields = Fields::from(vec![
3979 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3980 Field::new("nested1_invalid", ArrowDataType::Int64, false),
3981 ]);
3982 let nested = StructArray::try_new(
3983 nested_fields,
3984 vec![
3985 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3986 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3987 ],
3988 None,
3989 )
3990 .expect("struct array");
3991 let supplied_nested_fields = Fields::from(vec![
3992 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3993 Field::new("nested1_invalid", ArrowDataType::Int32, false),
3994 ]);
3995 run_schema_test_with_error(
3996 vec![
3997 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3998 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3999 ("nested", Arc::new(nested) as ArrayRef),
4000 ],
4001 Arc::new(Schema::new(vec![
4002 Field::new("col1", ArrowDataType::Int64, false),
4003 Field::new("col2", ArrowDataType::Int32, false),
4004 Field::new(
4005 "nested",
4006 ArrowDataType::Struct(supplied_nested_fields),
4007 false,
4008 ),
4009 ])),
4010 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field nested: \
4011 requested Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int32) \
4012 but found Struct(\"nested1_valid\": non-null Utf8, \"nested1_invalid\": non-null Int64)",
4013 );
4014 }
4015
4016 fn utf8_parquet() -> Bytes {
4018 let input = StringArray::from_iter_values(vec!["foo", "bar", "baz"]);
4019 let batch = RecordBatch::try_from_iter(vec![("column1", Arc::new(input) as _)]).unwrap();
4020 let props = None;
4021 let mut parquet_data = vec![];
4023 let mut writer = ArrowWriter::try_new(&mut parquet_data, batch.schema(), props).unwrap();
4024 writer.write(&batch).unwrap();
4025 writer.close().unwrap();
4026 Bytes::from(parquet_data)
4027 }
4028
4029 #[test]
4030 fn test_schema_error_bad_types() {
4031 let parquet_data = utf8_parquet();
4033
4034 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4036 "column1",
4037 arrow::datatypes::DataType::Int32,
4038 false,
4039 )]));
4040
4041 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4043 let err =
4044 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4045 .unwrap_err();
4046 assert_eq!(
4047 err.to_string(),
4048 "Arrow: Incompatible supplied Arrow schema: data type mismatch for field column1: requested Int32 but found Utf8"
4049 )
4050 }
4051
4052 #[test]
4053 fn test_schema_error_bad_nullability() {
4054 let parquet_data = utf8_parquet();
4056
4057 let input_schema: SchemaRef = Arc::new(Schema::new(vec![Field::new(
4059 "column1",
4060 arrow::datatypes::DataType::Utf8,
4061 true,
4062 )]));
4063
4064 let reader_options = ArrowReaderOptions::new().with_schema(input_schema.clone());
4066 let err =
4067 ParquetRecordBatchReaderBuilder::try_new_with_options(parquet_data, reader_options)
4068 .unwrap_err();
4069 assert_eq!(
4070 err.to_string(),
4071 "Arrow: Incompatible supplied Arrow schema: nullability mismatch for field column1: expected true but found false"
4072 )
4073 }
4074
4075 #[test]
4076 fn test_read_binary_as_utf8() {
4077 let file = write_parquet_from_iter(vec![
4078 (
4079 "binary_to_utf8",
4080 Arc::new(BinaryArray::from(vec![
4081 b"one".as_ref(),
4082 b"two".as_ref(),
4083 b"three".as_ref(),
4084 ])) as ArrayRef,
4085 ),
4086 (
4087 "large_binary_to_large_utf8",
4088 Arc::new(LargeBinaryArray::from(vec![
4089 b"one".as_ref(),
4090 b"two".as_ref(),
4091 b"three".as_ref(),
4092 ])) as ArrayRef,
4093 ),
4094 (
4095 "binary_view_to_utf8_view",
4096 Arc::new(BinaryViewArray::from(vec![
4097 b"one".as_ref(),
4098 b"two".as_ref(),
4099 b"three".as_ref(),
4100 ])) as ArrayRef,
4101 ),
4102 ]);
4103 let supplied_fields = Fields::from(vec![
4104 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
4105 Field::new(
4106 "large_binary_to_large_utf8",
4107 ArrowDataType::LargeUtf8,
4108 false,
4109 ),
4110 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
4111 ]);
4112
4113 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4114 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4115 file.try_clone().unwrap(),
4116 options,
4117 )
4118 .expect("reader builder with schema")
4119 .build()
4120 .expect("reader with schema");
4121
4122 let batch = arrow_reader.next().unwrap().unwrap();
4123 assert_eq!(batch.num_columns(), 3);
4124 assert_eq!(batch.num_rows(), 3);
4125 assert_eq!(
4126 batch
4127 .column(0)
4128 .as_string::<i32>()
4129 .iter()
4130 .collect::<Vec<_>>(),
4131 vec![Some("one"), Some("two"), Some("three")]
4132 );
4133
4134 assert_eq!(
4135 batch
4136 .column(1)
4137 .as_string::<i64>()
4138 .iter()
4139 .collect::<Vec<_>>(),
4140 vec![Some("one"), Some("two"), Some("three")]
4141 );
4142
4143 assert_eq!(
4144 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
4145 vec![Some("one"), Some("two"), Some("three")]
4146 );
4147 }
4148
4149 #[test]
4150 #[should_panic(expected = "Invalid UTF8 sequence at")]
4151 fn test_read_non_utf8_binary_as_utf8() {
4152 let file = write_parquet_from_iter(vec![(
4153 "non_utf8_binary",
4154 Arc::new(BinaryArray::from(vec![
4155 b"\xDE\x00\xFF".as_ref(),
4156 b"\xDE\x01\xAA".as_ref(),
4157 b"\xDE\x02\xFF".as_ref(),
4158 ])) as ArrayRef,
4159 )]);
4160 let supplied_fields = Fields::from(vec![Field::new(
4161 "non_utf8_binary",
4162 ArrowDataType::Utf8,
4163 false,
4164 )]);
4165
4166 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
4167 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4168 file.try_clone().unwrap(),
4169 options,
4170 )
4171 .expect("reader builder with schema")
4172 .build()
4173 .expect("reader with schema");
4174 arrow_reader.next().unwrap().unwrap_err();
4175 }
4176
4177 #[test]
4178 fn test_with_schema() {
4179 let nested_fields = Fields::from(vec![
4180 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
4181 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
4182 ]);
4183
4184 let nested_arrays: Vec<ArrayRef> = vec![
4185 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
4186 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
4187 ];
4188
4189 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
4190
4191 let file = write_parquet_from_iter(vec![
4192 (
4193 "int32_to_ts_second",
4194 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4195 ),
4196 (
4197 "date32_to_date64",
4198 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
4199 ),
4200 ("nested", Arc::new(nested) as ArrayRef),
4201 ]);
4202
4203 let supplied_nested_fields = Fields::from(vec![
4204 Field::new(
4205 "utf8_to_dict",
4206 ArrowDataType::Dictionary(
4207 Box::new(ArrowDataType::Int32),
4208 Box::new(ArrowDataType::Utf8),
4209 ),
4210 false,
4211 ),
4212 Field::new(
4213 "int64_to_ts_nano",
4214 ArrowDataType::Timestamp(
4215 arrow::datatypes::TimeUnit::Nanosecond,
4216 Some("+10:00".into()),
4217 ),
4218 false,
4219 ),
4220 ]);
4221
4222 let supplied_schema = Arc::new(Schema::new(vec![
4223 Field::new(
4224 "int32_to_ts_second",
4225 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
4226 false,
4227 ),
4228 Field::new("date32_to_date64", ArrowDataType::Date64, false),
4229 Field::new(
4230 "nested",
4231 ArrowDataType::Struct(supplied_nested_fields),
4232 false,
4233 ),
4234 ]));
4235
4236 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
4237 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
4238 file.try_clone().unwrap(),
4239 options,
4240 )
4241 .expect("reader builder with schema")
4242 .build()
4243 .expect("reader with schema");
4244
4245 assert_eq!(arrow_reader.schema(), supplied_schema);
4246 let batch = arrow_reader.next().unwrap().unwrap();
4247 assert_eq!(batch.num_columns(), 3);
4248 assert_eq!(batch.num_rows(), 4);
4249 assert_eq!(
4250 batch
4251 .column(0)
4252 .as_any()
4253 .downcast_ref::<TimestampSecondArray>()
4254 .expect("downcast to timestamp second")
4255 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
4256 .map(|v| v.to_string())
4257 .expect("value as datetime"),
4258 "1970-01-01 01:00:00 +01:00"
4259 );
4260 assert_eq!(
4261 batch
4262 .column(1)
4263 .as_any()
4264 .downcast_ref::<Date64Array>()
4265 .expect("downcast to date64")
4266 .value_as_date(0)
4267 .map(|v| v.to_string())
4268 .expect("value as date"),
4269 "1970-01-01"
4270 );
4271
4272 let nested = batch
4273 .column(2)
4274 .as_any()
4275 .downcast_ref::<StructArray>()
4276 .expect("downcast to struct");
4277
4278 let nested_dict = nested
4279 .column(0)
4280 .as_any()
4281 .downcast_ref::<Int32DictionaryArray>()
4282 .expect("downcast to dictionary");
4283
4284 assert_eq!(
4285 nested_dict
4286 .values()
4287 .as_any()
4288 .downcast_ref::<StringArray>()
4289 .expect("downcast to string")
4290 .iter()
4291 .collect::<Vec<_>>(),
4292 vec![Some("a"), Some("b")]
4293 );
4294
4295 assert_eq!(
4296 nested_dict.keys().iter().collect::<Vec<_>>(),
4297 vec![Some(0), Some(0), Some(0), Some(1)]
4298 );
4299
4300 assert_eq!(
4301 nested
4302 .column(1)
4303 .as_any()
4304 .downcast_ref::<TimestampNanosecondArray>()
4305 .expect("downcast to timestamp nanosecond")
4306 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
4307 .map(|v| v.to_string())
4308 .expect("value as datetime"),
4309 "1970-01-01 10:00:00.000000001 +10:00"
4310 );
4311 }
4312
4313 #[test]
4314 fn test_empty_projection() {
4315 let testdata = arrow::util::test_util::parquet_test_data();
4316 let path = format!("{testdata}/alltypes_plain.parquet");
4317 let file = File::open(path).unwrap();
4318
4319 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
4320 let file_metadata = builder.metadata().file_metadata();
4321 let expected_rows = file_metadata.num_rows() as usize;
4322
4323 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
4324 let batch_reader = builder
4325 .with_projection(mask)
4326 .with_batch_size(2)
4327 .build()
4328 .unwrap();
4329
4330 let mut total_rows = 0;
4331 for maybe_batch in batch_reader {
4332 let batch = maybe_batch.unwrap();
4333 total_rows += batch.num_rows();
4334 assert_eq!(batch.num_columns(), 0);
4335 assert!(batch.num_rows() <= 2);
4336 }
4337
4338 assert_eq!(total_rows, expected_rows);
4339 }
4340
4341 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
4342 let schema = Arc::new(Schema::new(vec![Field::new(
4343 "list",
4344 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
4345 true,
4346 )]));
4347
4348 let mut buf = Vec::with_capacity(1024);
4349
4350 let mut writer = ArrowWriter::try_new(
4351 &mut buf,
4352 schema.clone(),
4353 Some(
4354 WriterProperties::builder()
4355 .set_max_row_group_size(row_group_size)
4356 .build(),
4357 ),
4358 )
4359 .unwrap();
4360 for _ in 0..2 {
4361 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
4362 for _ in 0..(batch_size) {
4363 list_builder.append(true);
4364 }
4365 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
4366 .unwrap();
4367 writer.write(&batch).unwrap();
4368 }
4369 writer.close().unwrap();
4370
4371 let mut record_reader =
4372 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
4373 assert_eq!(
4374 batch_size,
4375 record_reader.next().unwrap().unwrap().num_rows()
4376 );
4377 assert_eq!(
4378 batch_size,
4379 record_reader.next().unwrap().unwrap().num_rows()
4380 );
4381 }
4382
4383 #[test]
4384 fn test_row_group_exact_multiple() {
4385 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
4386 test_row_group_batch(8, 8);
4387 test_row_group_batch(10, 8);
4388 test_row_group_batch(8, 10);
4389 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
4390 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
4391 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
4392 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
4393 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
4394 }
4395
4396 fn get_expected_batches(
4399 column: &RecordBatch,
4400 selection: &RowSelection,
4401 batch_size: usize,
4402 ) -> Vec<RecordBatch> {
4403 let mut expected_batches = vec![];
4404
4405 let mut selection: VecDeque<_> = selection.clone().into();
4406 let mut row_offset = 0;
4407 let mut last_start = None;
4408 while row_offset < column.num_rows() && !selection.is_empty() {
4409 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
4410 while batch_remaining > 0 && !selection.is_empty() {
4411 let (to_read, skip) = match selection.front_mut() {
4412 Some(selection) if selection.row_count > batch_remaining => {
4413 selection.row_count -= batch_remaining;
4414 (batch_remaining, selection.skip)
4415 }
4416 Some(_) => {
4417 let select = selection.pop_front().unwrap();
4418 (select.row_count, select.skip)
4419 }
4420 None => break,
4421 };
4422
4423 batch_remaining -= to_read;
4424
4425 match skip {
4426 true => {
4427 if let Some(last_start) = last_start.take() {
4428 expected_batches.push(column.slice(last_start, row_offset - last_start))
4429 }
4430 row_offset += to_read
4431 }
4432 false => {
4433 last_start.get_or_insert(row_offset);
4434 row_offset += to_read
4435 }
4436 }
4437 }
4438 }
4439
4440 if let Some(last_start) = last_start.take() {
4441 expected_batches.push(column.slice(last_start, row_offset - last_start))
4442 }
4443
4444 for batch in &expected_batches[..expected_batches.len() - 1] {
4446 assert_eq!(batch.num_rows(), batch_size);
4447 }
4448
4449 expected_batches
4450 }
4451
4452 fn create_test_selection(
4453 step_len: usize,
4454 total_len: usize,
4455 skip_first: bool,
4456 ) -> (RowSelection, usize) {
4457 let mut remaining = total_len;
4458 let mut skip = skip_first;
4459 let mut vec = vec![];
4460 let mut selected_count = 0;
4461 while remaining != 0 {
4462 let step = if remaining > step_len {
4463 step_len
4464 } else {
4465 remaining
4466 };
4467 vec.push(RowSelector {
4468 row_count: step,
4469 skip,
4470 });
4471 remaining -= step;
4472 if !skip {
4473 selected_count += step;
4474 }
4475 skip = !skip;
4476 }
4477 (vec.into(), selected_count)
4478 }
4479
4480 #[test]
4481 fn test_scan_row_with_selection() {
4482 let testdata = arrow::util::test_util::parquet_test_data();
4483 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
4484 let test_file = File::open(&path).unwrap();
4485
4486 let mut serial_reader =
4487 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
4488 let data = serial_reader.next().unwrap().unwrap();
4489
4490 let do_test = |batch_size: usize, selection_len: usize| {
4491 for skip_first in [false, true] {
4492 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
4493
4494 let expected = get_expected_batches(&data, &selections, batch_size);
4495 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
4496 assert_eq!(
4497 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
4498 expected,
4499 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
4500 );
4501 }
4502 };
4503
4504 do_test(1000, 1000);
4507
4508 do_test(20, 20);
4510
4511 do_test(20, 5);
4513
4514 do_test(20, 5);
4517
4518 fn create_skip_reader(
4519 test_file: &File,
4520 batch_size: usize,
4521 selections: RowSelection,
4522 ) -> ParquetRecordBatchReader {
4523 let options = ArrowReaderOptions::new().with_page_index(true);
4524 let file = test_file.try_clone().unwrap();
4525 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
4526 .unwrap()
4527 .with_batch_size(batch_size)
4528 .with_row_selection(selections)
4529 .build()
4530 .unwrap()
4531 }
4532 }
4533
4534 #[test]
4535 fn test_batch_size_overallocate() {
4536 let testdata = arrow::util::test_util::parquet_test_data();
4537 let path = format!("{testdata}/alltypes_plain.parquet");
4539 let test_file = File::open(path).unwrap();
4540
4541 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4542 let num_rows = builder.metadata.file_metadata().num_rows();
4543 let reader = builder
4544 .with_batch_size(1024)
4545 .with_projection(ProjectionMask::all())
4546 .build()
4547 .unwrap();
4548 assert_ne!(1024, num_rows);
4549 assert_eq!(reader.read_plan.batch_size(), num_rows as usize);
4550 }
4551
4552 #[test]
4553 fn test_read_with_page_index_enabled() {
4554 let testdata = arrow::util::test_util::parquet_test_data();
4555
4556 {
4557 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
4559 let test_file = File::open(path).unwrap();
4560 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4561 test_file,
4562 ArrowReaderOptions::new().with_page_index(true),
4563 )
4564 .unwrap();
4565 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
4566 let reader = builder.build().unwrap();
4567 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4568 assert_eq!(batches.len(), 8);
4569 }
4570
4571 {
4572 let path = format!("{testdata}/alltypes_plain.parquet");
4574 let test_file = File::open(path).unwrap();
4575 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
4576 test_file,
4577 ArrowReaderOptions::new().with_page_index(true),
4578 )
4579 .unwrap();
4580 assert!(builder.metadata().offset_index().is_none());
4583 let reader = builder.build().unwrap();
4584 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4585 assert_eq!(batches.len(), 1);
4586 }
4587 }
4588
4589 #[test]
4590 fn test_raw_repetition() {
4591 const MESSAGE_TYPE: &str = "
4592 message Log {
4593 OPTIONAL INT32 eventType;
4594 REPEATED INT32 category;
4595 REPEATED group filter {
4596 OPTIONAL INT32 error;
4597 }
4598 }
4599 ";
4600 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
4601 let props = Default::default();
4602
4603 let mut buf = Vec::with_capacity(1024);
4604 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
4605 let mut row_group_writer = writer.next_row_group().unwrap();
4606
4607 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4609 col_writer
4610 .typed::<Int32Type>()
4611 .write_batch(&[1], Some(&[1]), None)
4612 .unwrap();
4613 col_writer.close().unwrap();
4614 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4616 col_writer
4617 .typed::<Int32Type>()
4618 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
4619 .unwrap();
4620 col_writer.close().unwrap();
4621 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4623 col_writer
4624 .typed::<Int32Type>()
4625 .write_batch(&[1], Some(&[1]), Some(&[0]))
4626 .unwrap();
4627 col_writer.close().unwrap();
4628
4629 let rg_md = row_group_writer.close().unwrap();
4630 assert_eq!(rg_md.num_rows(), 1);
4631 writer.close().unwrap();
4632
4633 let bytes = Bytes::from(buf);
4634
4635 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
4636 let full = no_mask.next().unwrap().unwrap();
4637
4638 assert_eq!(full.num_columns(), 3);
4639
4640 for idx in 0..3 {
4641 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
4642 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
4643 let mut reader = b.with_projection(mask).build().unwrap();
4644 let projected = reader.next().unwrap().unwrap();
4645
4646 assert_eq!(projected.num_columns(), 1);
4647 assert_eq!(full.column(idx), projected.column(0));
4648 }
4649 }
4650
4651 #[test]
4652 fn test_read_lz4_raw() {
4653 let testdata = arrow::util::test_util::parquet_test_data();
4654 let path = format!("{testdata}/lz4_raw_compressed.parquet");
4655 let file = File::open(path).unwrap();
4656
4657 let batches = ParquetRecordBatchReader::try_new(file, 1024)
4658 .unwrap()
4659 .collect::<Result<Vec<_>, _>>()
4660 .unwrap();
4661 assert_eq!(batches.len(), 1);
4662 let batch = &batches[0];
4663
4664 assert_eq!(batch.num_columns(), 3);
4665 assert_eq!(batch.num_rows(), 4);
4666
4667 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4669 assert_eq!(
4670 a.values(),
4671 &[1593604800, 1593604800, 1593604801, 1593604801]
4672 );
4673
4674 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4675 let a: Vec<_> = a.iter().flatten().collect();
4676 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
4677
4678 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4679 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
4680 }
4681
4682 #[test]
4692 fn test_read_lz4_hadoop_fallback() {
4693 for file in [
4694 "hadoop_lz4_compressed.parquet",
4695 "non_hadoop_lz4_compressed.parquet",
4696 ] {
4697 let testdata = arrow::util::test_util::parquet_test_data();
4698 let path = format!("{testdata}/{file}");
4699 let file = File::open(path).unwrap();
4700 let expected_rows = 4;
4701
4702 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4703 .unwrap()
4704 .collect::<Result<Vec<_>, _>>()
4705 .unwrap();
4706 assert_eq!(batches.len(), 1);
4707 let batch = &batches[0];
4708
4709 assert_eq!(batch.num_columns(), 3);
4710 assert_eq!(batch.num_rows(), expected_rows);
4711
4712 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
4713 assert_eq!(
4714 a.values(),
4715 &[1593604800, 1593604800, 1593604801, 1593604801]
4716 );
4717
4718 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
4719 let b: Vec<_> = b.iter().flatten().collect();
4720 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4721
4722 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4723 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4724 }
4725 }
4726
4727 #[test]
4728 fn test_read_lz4_hadoop_large() {
4729 let testdata = arrow::util::test_util::parquet_test_data();
4730 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4731 let file = File::open(path).unwrap();
4732 let expected_rows = 10000;
4733
4734 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4735 .unwrap()
4736 .collect::<Result<Vec<_>, _>>()
4737 .unwrap();
4738 assert_eq!(batches.len(), 1);
4739 let batch = &batches[0];
4740
4741 assert_eq!(batch.num_columns(), 1);
4742 assert_eq!(batch.num_rows(), expected_rows);
4743
4744 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4745 let a: Vec<_> = a.iter().flatten().collect();
4746 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4747 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4748 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4749 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4750 }
4751
4752 #[test]
4753 #[cfg(feature = "snap")]
4754 fn test_read_nested_lists() {
4755 let testdata = arrow::util::test_util::parquet_test_data();
4756 let path = format!("{testdata}/nested_lists.snappy.parquet");
4757 let file = File::open(path).unwrap();
4758
4759 let f = file.try_clone().unwrap();
4760 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4761 let expected = reader.next().unwrap().unwrap();
4762 assert_eq!(expected.num_rows(), 3);
4763
4764 let selection = RowSelection::from(vec![
4765 RowSelector::skip(1),
4766 RowSelector::select(1),
4767 RowSelector::skip(1),
4768 ]);
4769 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4770 .unwrap()
4771 .with_row_selection(selection)
4772 .build()
4773 .unwrap();
4774
4775 let actual = reader.next().unwrap().unwrap();
4776 assert_eq!(actual.num_rows(), 1);
4777 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4778 }
4779
4780 #[test]
4781 fn test_arbitrary_decimal() {
4782 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4783 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4784 .with_precision_and_scale(19, 0)
4785 .unwrap();
4786 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4787 .with_precision_and_scale(12, 0)
4788 .unwrap();
4789 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4790 .with_precision_and_scale(17, 10)
4791 .unwrap();
4792
4793 let written = RecordBatch::try_from_iter([
4794 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4795 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4796 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4797 ])
4798 .unwrap();
4799
4800 let mut buffer = Vec::with_capacity(1024);
4801 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4802 writer.write(&written).unwrap();
4803 writer.close().unwrap();
4804
4805 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4806 .unwrap()
4807 .collect::<Result<Vec<_>, _>>()
4808 .unwrap();
4809
4810 assert_eq!(&written.slice(0, 8), &read[0]);
4811 }
4812
4813 #[test]
4814 fn test_list_skip() {
4815 let mut list = ListBuilder::new(Int32Builder::new());
4816 list.append_value([Some(1), Some(2)]);
4817 list.append_value([Some(3)]);
4818 list.append_value([Some(4)]);
4819 let list = list.finish();
4820 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4821
4822 let props = WriterProperties::builder()
4824 .set_data_page_row_count_limit(1)
4825 .set_write_batch_size(2)
4826 .build();
4827
4828 let mut buffer = Vec::with_capacity(1024);
4829 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4830 writer.write(&batch).unwrap();
4831 writer.close().unwrap();
4832
4833 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4834 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4835 .unwrap()
4836 .with_row_selection(selection.into())
4837 .build()
4838 .unwrap();
4839 let out = reader.next().unwrap().unwrap();
4840 assert_eq!(out.num_rows(), 1);
4841 assert_eq!(out, batch.slice(2, 1));
4842 }
4843
4844 #[test]
4845 fn test_row_selection_interleaved_skip() -> Result<()> {
4846 let schema = Arc::new(Schema::new(vec![Field::new(
4847 "v",
4848 ArrowDataType::Int32,
4849 false,
4850 )]));
4851
4852 let values = Int32Array::from(vec![0, 1, 2, 3, 4]);
4853 let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)]).unwrap();
4854
4855 let mut buffer = Vec::with_capacity(1024);
4856 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None).unwrap();
4857 writer.write(&batch)?;
4858 writer.close()?;
4859
4860 let selection = RowSelection::from(vec![
4861 RowSelector::select(1),
4862 RowSelector::skip(2),
4863 RowSelector::select(2),
4864 ]);
4865
4866 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))?
4867 .with_batch_size(4)
4868 .with_row_selection(selection)
4869 .build()?;
4870
4871 let out = reader.next().unwrap()?;
4872 assert_eq!(out.num_rows(), 3);
4873 let values = out
4874 .column(0)
4875 .as_primitive::<arrow_array::types::Int32Type>()
4876 .values();
4877 assert_eq!(values, &[0, 3, 4]);
4878 assert!(reader.next().is_none());
4879 Ok(())
4880 }
4881
4882 #[test]
4883 fn test_row_selection_mask_sparse_rows() -> Result<()> {
4884 let schema = Arc::new(Schema::new(vec![Field::new(
4885 "v",
4886 ArrowDataType::Int32,
4887 false,
4888 )]));
4889
4890 let values = Int32Array::from((0..30).collect::<Vec<i32>>());
4891 let batch = RecordBatch::try_from_iter([("v", Arc::new(values) as ArrayRef)])?;
4892
4893 let mut buffer = Vec::with_capacity(1024);
4894 let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), None)?;
4895 writer.write(&batch)?;
4896 writer.close()?;
4897
4898 let total_rows = batch.num_rows();
4899 let ranges = (1..total_rows)
4900 .step_by(2)
4901 .map(|i| i..i + 1)
4902 .collect::<Vec<_>>();
4903 let selection = RowSelection::from_consecutive_ranges(ranges.into_iter(), total_rows);
4904
4905 let selectors: Vec<RowSelector> = selection.clone().into();
4906 assert!(total_rows < selectors.len() * 8);
4907
4908 let bytes = Bytes::from(buffer);
4909
4910 let reader = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())?
4911 .with_batch_size(7)
4912 .with_row_selection(selection)
4913 .build()?;
4914
4915 let mut collected = Vec::new();
4916 for batch in reader {
4917 let batch = batch?;
4918 collected.extend_from_slice(
4919 batch
4920 .column(0)
4921 .as_primitive::<arrow_array::types::Int32Type>()
4922 .values(),
4923 );
4924 }
4925
4926 let expected: Vec<i32> = (1..total_rows).step_by(2).map(|i| i as i32).collect();
4927 assert_eq!(collected, expected);
4928 Ok(())
4929 }
4930
4931 fn test_decimal32_roundtrip() {
4932 let d = |values: Vec<i32>, p: u8| {
4933 let iter = values.into_iter();
4934 PrimitiveArray::<Decimal32Type>::from_iter_values(iter)
4935 .with_precision_and_scale(p, 2)
4936 .unwrap()
4937 };
4938
4939 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4940 let batch = RecordBatch::try_from_iter([("d1", Arc::new(d1) as ArrayRef)]).unwrap();
4941
4942 let mut buffer = Vec::with_capacity(1024);
4943 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4944 writer.write(&batch).unwrap();
4945 writer.close().unwrap();
4946
4947 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4948 let t1 = builder.parquet_schema().columns()[0].physical_type();
4949 assert_eq!(t1, PhysicalType::INT32);
4950
4951 let mut reader = builder.build().unwrap();
4952 assert_eq!(batch.schema(), reader.schema());
4953
4954 let out = reader.next().unwrap().unwrap();
4955 assert_eq!(batch, out);
4956 }
4957
4958 fn test_decimal64_roundtrip() {
4959 let d = |values: Vec<i64>, p: u8| {
4963 let iter = values.into_iter();
4964 PrimitiveArray::<Decimal64Type>::from_iter_values(iter)
4965 .with_precision_and_scale(p, 2)
4966 .unwrap()
4967 };
4968
4969 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4970 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4971 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4972
4973 let batch = RecordBatch::try_from_iter([
4974 ("d1", Arc::new(d1) as ArrayRef),
4975 ("d2", Arc::new(d2) as ArrayRef),
4976 ("d3", Arc::new(d3) as ArrayRef),
4977 ])
4978 .unwrap();
4979
4980 let mut buffer = Vec::with_capacity(1024);
4981 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4982 writer.write(&batch).unwrap();
4983 writer.close().unwrap();
4984
4985 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4986 let t1 = builder.parquet_schema().columns()[0].physical_type();
4987 assert_eq!(t1, PhysicalType::INT32);
4988 let t2 = builder.parquet_schema().columns()[1].physical_type();
4989 assert_eq!(t2, PhysicalType::INT64);
4990 let t3 = builder.parquet_schema().columns()[2].physical_type();
4991 assert_eq!(t3, PhysicalType::INT64);
4992
4993 let mut reader = builder.build().unwrap();
4994 assert_eq!(batch.schema(), reader.schema());
4995
4996 let out = reader.next().unwrap().unwrap();
4997 assert_eq!(batch, out);
4998 }
4999
5000 fn test_decimal_roundtrip<T: DecimalType>() {
5001 let d = |values: Vec<usize>, p: u8| {
5006 let iter = values.into_iter().map(T::Native::usize_as);
5007 PrimitiveArray::<T>::from_iter_values(iter)
5008 .with_precision_and_scale(p, 2)
5009 .unwrap()
5010 };
5011
5012 let d1 = d(vec![1, 2, 3, 4, 5], 9);
5013 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
5014 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
5015 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
5016
5017 let batch = RecordBatch::try_from_iter([
5018 ("d1", Arc::new(d1) as ArrayRef),
5019 ("d2", Arc::new(d2) as ArrayRef),
5020 ("d3", Arc::new(d3) as ArrayRef),
5021 ("d4", Arc::new(d4) as ArrayRef),
5022 ])
5023 .unwrap();
5024
5025 let mut buffer = Vec::with_capacity(1024);
5026 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
5027 writer.write(&batch).unwrap();
5028 writer.close().unwrap();
5029
5030 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
5031 let t1 = builder.parquet_schema().columns()[0].physical_type();
5032 assert_eq!(t1, PhysicalType::INT32);
5033 let t2 = builder.parquet_schema().columns()[1].physical_type();
5034 assert_eq!(t2, PhysicalType::INT64);
5035 let t3 = builder.parquet_schema().columns()[2].physical_type();
5036 assert_eq!(t3, PhysicalType::INT64);
5037 let t4 = builder.parquet_schema().columns()[3].physical_type();
5038 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
5039
5040 let mut reader = builder.build().unwrap();
5041 assert_eq!(batch.schema(), reader.schema());
5042
5043 let out = reader.next().unwrap().unwrap();
5044 assert_eq!(batch, out);
5045 }
5046
5047 #[test]
5048 fn test_decimal() {
5049 test_decimal32_roundtrip();
5050 test_decimal64_roundtrip();
5051 test_decimal_roundtrip::<Decimal128Type>();
5052 test_decimal_roundtrip::<Decimal256Type>();
5053 }
5054
5055 #[test]
5056 fn test_list_selection() {
5057 let schema = Arc::new(Schema::new(vec![Field::new_list(
5058 "list",
5059 Field::new_list_field(ArrowDataType::Utf8, true),
5060 false,
5061 )]));
5062 let mut buf = Vec::with_capacity(1024);
5063
5064 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5065
5066 for i in 0..2 {
5067 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
5068 for j in 0..1024 {
5069 list_a_builder.values().append_value(format!("{i} {j}"));
5070 list_a_builder.append(true);
5071 }
5072 let batch =
5073 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
5074 .unwrap();
5075 writer.write(&batch).unwrap();
5076 }
5077 let _metadata = writer.close().unwrap();
5078
5079 let buf = Bytes::from(buf);
5080 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
5081 .unwrap()
5082 .with_row_selection(RowSelection::from(vec![
5083 RowSelector::skip(100),
5084 RowSelector::select(924),
5085 RowSelector::skip(100),
5086 RowSelector::select(924),
5087 ]))
5088 .build()
5089 .unwrap();
5090
5091 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5092 let batch = concat_batches(&schema, &batches).unwrap();
5093
5094 assert_eq!(batch.num_rows(), 924 * 2);
5095 let list = batch.column(0).as_list::<i32>();
5096
5097 for w in list.value_offsets().windows(2) {
5098 assert_eq!(w[0] + 1, w[1])
5099 }
5100 let mut values = list.values().as_string::<i32>().iter();
5101
5102 for i in 0..2 {
5103 for j in 100..1024 {
5104 let expected = format!("{i} {j}");
5105 assert_eq!(values.next().unwrap().unwrap(), &expected);
5106 }
5107 }
5108 }
5109
5110 #[test]
5111 fn test_list_selection_fuzz() {
5112 let mut rng = rng();
5113 let schema = Arc::new(Schema::new(vec![Field::new_list(
5114 "list",
5115 Field::new_list(
5116 Field::LIST_FIELD_DEFAULT_NAME,
5117 Field::new_list_field(ArrowDataType::Int32, true),
5118 true,
5119 ),
5120 true,
5121 )]));
5122 let mut buf = Vec::with_capacity(1024);
5123 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
5124
5125 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
5126
5127 for _ in 0..2048 {
5128 if rng.random_bool(0.2) {
5129 list_a_builder.append(false);
5130 continue;
5131 }
5132
5133 let list_a_len = rng.random_range(0..10);
5134 let list_b_builder = list_a_builder.values();
5135
5136 for _ in 0..list_a_len {
5137 if rng.random_bool(0.2) {
5138 list_b_builder.append(false);
5139 continue;
5140 }
5141
5142 let list_b_len = rng.random_range(0..10);
5143 let int_builder = list_b_builder.values();
5144 for _ in 0..list_b_len {
5145 match rng.random_bool(0.2) {
5146 true => int_builder.append_null(),
5147 false => int_builder.append_value(rng.random()),
5148 }
5149 }
5150 list_b_builder.append(true)
5151 }
5152 list_a_builder.append(true);
5153 }
5154
5155 let array = Arc::new(list_a_builder.finish());
5156 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
5157
5158 writer.write(&batch).unwrap();
5159 let _metadata = writer.close().unwrap();
5160
5161 let buf = Bytes::from(buf);
5162
5163 let cases = [
5164 vec![
5165 RowSelector::skip(100),
5166 RowSelector::select(924),
5167 RowSelector::skip(100),
5168 RowSelector::select(924),
5169 ],
5170 vec![
5171 RowSelector::select(924),
5172 RowSelector::skip(100),
5173 RowSelector::select(924),
5174 RowSelector::skip(100),
5175 ],
5176 vec![
5177 RowSelector::skip(1023),
5178 RowSelector::select(1),
5179 RowSelector::skip(1023),
5180 RowSelector::select(1),
5181 ],
5182 vec![
5183 RowSelector::select(1),
5184 RowSelector::skip(1023),
5185 RowSelector::select(1),
5186 RowSelector::skip(1023),
5187 ],
5188 ];
5189
5190 for batch_size in [100, 1024, 2048] {
5191 for selection in &cases {
5192 let selection = RowSelection::from(selection.clone());
5193 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
5194 .unwrap()
5195 .with_row_selection(selection.clone())
5196 .with_batch_size(batch_size)
5197 .build()
5198 .unwrap();
5199
5200 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5201 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
5202 assert_eq!(actual.num_rows(), selection.row_count());
5203
5204 let mut batch_offset = 0;
5205 let mut actual_offset = 0;
5206 for selector in selection.iter() {
5207 if selector.skip {
5208 batch_offset += selector.row_count;
5209 continue;
5210 }
5211
5212 assert_eq!(
5213 batch.slice(batch_offset, selector.row_count),
5214 actual.slice(actual_offset, selector.row_count)
5215 );
5216
5217 batch_offset += selector.row_count;
5218 actual_offset += selector.row_count;
5219 }
5220 }
5221 }
5222 }
5223
5224 #[test]
5225 fn test_read_old_nested_list() {
5226 use arrow::datatypes::DataType;
5227 use arrow::datatypes::ToByteSlice;
5228
5229 let testdata = arrow::util::test_util::parquet_test_data();
5230 let path = format!("{testdata}/old_list_structure.parquet");
5239 let test_file = File::open(path).unwrap();
5240
5241 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
5243
5244 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
5246
5247 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
5249 "array",
5250 DataType::Int32,
5251 false,
5252 ))))
5253 .len(2)
5254 .add_buffer(a_value_offsets)
5255 .add_child_data(a_values.into_data())
5256 .build()
5257 .unwrap();
5258 let a = ListArray::from(a_list_data);
5259
5260 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
5261 let mut reader = builder.build().unwrap();
5262 let out = reader.next().unwrap().unwrap();
5263 assert_eq!(out.num_rows(), 1);
5264 assert_eq!(out.num_columns(), 1);
5265 let c0 = out.column(0);
5267 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
5268 let r0 = c0arr.value(0);
5270 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
5271 assert_eq!(r0arr, &a);
5272 }
5273
5274 #[test]
5275 fn test_map_no_value() {
5276 let testdata = arrow::util::test_util::parquet_test_data();
5296 let path = format!("{testdata}/map_no_value.parquet");
5297 let file = File::open(path).unwrap();
5298
5299 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
5300 .unwrap()
5301 .build()
5302 .unwrap();
5303 let out = reader.next().unwrap().unwrap();
5304 assert_eq!(out.num_rows(), 3);
5305 assert_eq!(out.num_columns(), 3);
5306 let c0 = out.column(1).as_list::<i32>();
5308 let c1 = out.column(2).as_list::<i32>();
5309 assert_eq!(c0.len(), c1.len());
5310 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
5311 }
5312
5313 #[test]
5314 fn test_row_filter_full_page_skip_is_handled() {
5315 let first_value: i64 = 1111;
5316 let last_value: i64 = 9999;
5317 let num_rows: usize = 12;
5318
5319 let schema = Arc::new(Schema::new(vec![
5323 Field::new("key", arrow_schema::DataType::Int64, false),
5324 Field::new("value", arrow_schema::DataType::Int64, false),
5325 ]));
5326
5327 let mut int_values: Vec<i64> = (0..num_rows as i64).collect();
5328 int_values[0] = first_value;
5329 int_values[num_rows - 1] = last_value;
5330 let keys = Int64Array::from(int_values.clone());
5331 let values = Int64Array::from(int_values.clone());
5332 let batch = RecordBatch::try_new(
5333 Arc::clone(&schema),
5334 vec![Arc::new(keys) as ArrayRef, Arc::new(values) as ArrayRef],
5335 )
5336 .unwrap();
5337
5338 let props = WriterProperties::builder()
5339 .set_write_batch_size(2)
5340 .set_data_page_row_count_limit(2)
5341 .build();
5342
5343 let mut buffer = Vec::new();
5344 let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap();
5345 writer.write(&batch).unwrap();
5346 writer.close().unwrap();
5347 let data = Bytes::from(buffer);
5348
5349 let options = ArrowReaderOptions::new().with_page_index(true);
5350 let builder =
5351 ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options).unwrap();
5352 let schema = builder.parquet_schema().clone();
5353 let filter_mask = ProjectionMask::leaves(&schema, [0]);
5354
5355 let make_predicate = |mask: ProjectionMask| {
5356 ArrowPredicateFn::new(mask, move |batch: RecordBatch| {
5357 let column = batch.column(0);
5358 let match_first = eq(column, &Int64Array::new_scalar(first_value))?;
5359 let match_second = eq(column, &Int64Array::new_scalar(last_value))?;
5360 or(&match_first, &match_second)
5361 })
5362 };
5363
5364 let options = ArrowReaderOptions::new().with_page_index(true);
5365 let predicate = make_predicate(filter_mask.clone());
5366
5367 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(data.clone(), options)
5370 .unwrap()
5371 .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
5372 .with_row_selection_policy(RowSelectionPolicy::Auto { threshold: 32 })
5373 .with_batch_size(12)
5374 .build()
5375 .unwrap();
5376
5377 let schema = reader.schema().clone();
5380 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
5381 let result = concat_batches(&schema, &batches).unwrap();
5382 assert_eq!(result.num_rows(), 2);
5383 }
5384
5385 #[test]
5386 fn test_get_row_group_column_bloom_filter_with_length() {
5387 let testdata = arrow::util::test_util::parquet_test_data();
5389 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5390 let file = File::open(path).unwrap();
5391 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
5392 let schema = builder.schema().clone();
5393 let reader = builder.build().unwrap();
5394
5395 let mut parquet_data = Vec::new();
5396 let props = WriterProperties::builder()
5397 .set_bloom_filter_enabled(true)
5398 .build();
5399 let mut writer = ArrowWriter::try_new(&mut parquet_data, schema, Some(props)).unwrap();
5400 for batch in reader {
5401 let batch = batch.unwrap();
5402 writer.write(&batch).unwrap();
5403 }
5404 writer.close().unwrap();
5405
5406 test_get_row_group_column_bloom_filter(parquet_data.into(), true);
5408 }
5409
5410 #[test]
5411 fn test_get_row_group_column_bloom_filter_without_length() {
5412 let testdata = arrow::util::test_util::parquet_test_data();
5413 let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
5414 let data = Bytes::from(std::fs::read(path).unwrap());
5415 test_get_row_group_column_bloom_filter(data, false);
5416 }
5417
5418 fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
5419 let builder = ParquetRecordBatchReaderBuilder::try_new(data.clone()).unwrap();
5420
5421 let metadata = builder.metadata();
5422 assert_eq!(metadata.num_row_groups(), 1);
5423 let row_group = metadata.row_group(0);
5424 let column = row_group.column(0);
5425 assert_eq!(column.bloom_filter_length().is_some(), with_length);
5426
5427 let sbbf = builder
5428 .get_row_group_column_bloom_filter(0, 0)
5429 .unwrap()
5430 .unwrap();
5431 assert!(sbbf.check(&"Hello"));
5432 assert!(!sbbf.check(&"Hello_Not_Exists"));
5433 }
5434
5435 #[test]
5436 fn test_read_unknown_logical_type() {
5437 let testdata = arrow::util::test_util::parquet_test_data();
5438 let path = format!("{testdata}/unknown-logical-type.parquet");
5439 let test_file = File::open(path).unwrap();
5440
5441 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file)
5442 .expect("Error creating reader builder");
5443
5444 let schema = builder.metadata().file_metadata().schema_descr();
5445 assert_eq!(
5446 schema.column(0).logical_type_ref(),
5447 Some(&LogicalType::String)
5448 );
5449 assert_eq!(
5450 schema.column(1).logical_type_ref(),
5451 Some(&LogicalType::_Unknown { field_id: 2555 })
5452 );
5453 assert_eq!(schema.column(1).physical_type(), PhysicalType::BYTE_ARRAY);
5454
5455 let mut reader = builder.build().unwrap();
5456 let out = reader.next().unwrap().unwrap();
5457 assert_eq!(out.num_rows(), 3);
5458 assert_eq!(out.num_columns(), 2);
5459 }
5460
5461 #[test]
5462 fn test_read_row_numbers() {
5463 let file = write_parquet_from_iter(vec![(
5464 "value",
5465 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5466 )]);
5467 let supplied_fields = Fields::from(vec![Field::new("value", ArrowDataType::Int64, false)]);
5468
5469 let row_number_field = Arc::new(
5470 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5471 );
5472
5473 let options = ArrowReaderOptions::new()
5474 .with_schema(Arc::new(Schema::new(supplied_fields)))
5475 .with_virtual_columns(vec![row_number_field.clone()])
5476 .unwrap();
5477 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
5478 file.try_clone().unwrap(),
5479 options,
5480 )
5481 .expect("reader builder with schema")
5482 .build()
5483 .expect("reader with schema");
5484
5485 let batch = arrow_reader.next().unwrap().unwrap();
5486 let schema = Arc::new(Schema::new(vec![
5487 Field::new("value", ArrowDataType::Int64, false),
5488 (*row_number_field).clone(),
5489 ]));
5490
5491 assert_eq!(batch.schema(), schema);
5492 assert_eq!(batch.num_columns(), 2);
5493 assert_eq!(batch.num_rows(), 3);
5494 assert_eq!(
5495 batch
5496 .column(0)
5497 .as_primitive::<types::Int64Type>()
5498 .iter()
5499 .collect::<Vec<_>>(),
5500 vec![Some(1), Some(2), Some(3)]
5501 );
5502 assert_eq!(
5503 batch
5504 .column(1)
5505 .as_primitive::<types::Int64Type>()
5506 .iter()
5507 .collect::<Vec<_>>(),
5508 vec![Some(0), Some(1), Some(2)]
5509 );
5510 }
5511
5512 #[test]
5513 fn test_read_only_row_numbers() {
5514 let file = write_parquet_from_iter(vec![(
5515 "value",
5516 Arc::new(Int64Array::from(vec![1, 2, 3])) as ArrayRef,
5517 )]);
5518 let row_number_field = Arc::new(
5519 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5520 );
5521 let options = ArrowReaderOptions::new()
5522 .with_virtual_columns(vec![row_number_field.clone()])
5523 .unwrap();
5524 let metadata = ArrowReaderMetadata::load(&file, options).unwrap();
5525 let num_columns = metadata
5526 .metadata
5527 .file_metadata()
5528 .schema_descr()
5529 .num_columns();
5530
5531 let mut arrow_reader = ParquetRecordBatchReaderBuilder::new_with_metadata(file, metadata)
5532 .with_projection(ProjectionMask::none(num_columns))
5533 .build()
5534 .expect("reader with schema");
5535
5536 let batch = arrow_reader.next().unwrap().unwrap();
5537 let schema = Arc::new(Schema::new(vec![row_number_field]));
5538
5539 assert_eq!(batch.schema(), schema);
5540 assert_eq!(batch.num_columns(), 1);
5541 assert_eq!(batch.num_rows(), 3);
5542 assert_eq!(
5543 batch
5544 .column(0)
5545 .as_primitive::<types::Int64Type>()
5546 .iter()
5547 .collect::<Vec<_>>(),
5548 vec![Some(0), Some(1), Some(2)]
5549 );
5550 }
5551
5552 #[test]
5553 fn test_read_row_numbers_row_group_order() -> Result<()> {
5554 let array = Int64Array::from_iter_values(5000..5100);
5556 let batch = RecordBatch::try_from_iter([("col", Arc::new(array) as ArrayRef)])?;
5557 let mut buffer = Vec::new();
5558 let options = WriterProperties::builder()
5559 .set_max_row_group_size(50)
5560 .build();
5561 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema().clone(), Some(options))?;
5562 for batch_chunk in (0..10).map(|i| batch.slice(i * 10, 10)) {
5564 writer.write(&batch_chunk)?;
5565 }
5566 writer.close()?;
5567
5568 let row_number_field = Arc::new(
5569 Field::new("row_number", ArrowDataType::Int64, false).with_extension_type(RowNumber),
5570 );
5571
5572 let buffer = Bytes::from(buffer);
5573
5574 let options =
5575 ArrowReaderOptions::new().with_virtual_columns(vec![row_number_field.clone()])?;
5576
5577 let arrow_reader =
5579 ParquetRecordBatchReaderBuilder::try_new_with_options(buffer.clone(), options.clone())?
5580 .build()?;
5581
5582 assert_eq!(
5583 ValuesAndRowNumbers {
5584 values: (5000..5100).collect(),
5585 row_numbers: (0..100).collect()
5586 },
5587 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5588 );
5589
5590 let arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(buffer, options)?
5592 .with_row_groups(vec![1, 0])
5593 .build()?;
5594
5595 assert_eq!(
5596 ValuesAndRowNumbers {
5597 values: (5050..5100).chain(5000..5050).collect(),
5598 row_numbers: (50..100).chain(0..50).collect(),
5599 },
5600 ValuesAndRowNumbers::new_from_reader(arrow_reader)
5601 );
5602
5603 Ok(())
5604 }
5605
5606 #[derive(Debug, PartialEq)]
5607 struct ValuesAndRowNumbers {
5608 values: Vec<i64>,
5609 row_numbers: Vec<i64>,
5610 }
5611 impl ValuesAndRowNumbers {
5612 fn new_from_reader(reader: ParquetRecordBatchReader) -> Self {
5613 let mut values = vec![];
5614 let mut row_numbers = vec![];
5615 for batch in reader {
5616 let batch = batch.expect("Could not read batch");
5617 values.extend(
5618 batch
5619 .column_by_name("col")
5620 .expect("Could not get col column")
5621 .as_primitive::<arrow::datatypes::Int64Type>()
5622 .iter()
5623 .map(|v| v.expect("Could not get value")),
5624 );
5625
5626 row_numbers.extend(
5627 batch
5628 .column_by_name("row_number")
5629 .expect("Could not get row_number column")
5630 .as_primitive::<arrow::datatypes::Int64Type>()
5631 .iter()
5632 .map(|v| v.expect("Could not get row number"))
5633 .collect::<Vec<_>>(),
5634 );
5635 }
5636 Self {
5637 values,
5638 row_numbers,
5639 }
5640 }
5641 }
5642
5643 #[test]
5644 fn test_with_virtual_columns_rejects_non_virtual_fields() {
5645 let regular_field = Arc::new(Field::new("regular_column", ArrowDataType::Int64, false));
5647 assert_eq!(
5648 ArrowReaderOptions::new()
5649 .with_virtual_columns(vec![regular_field])
5650 .unwrap_err()
5651 .to_string(),
5652 "Parquet error: Field 'regular_column' is not a virtual column. Virtual columns must have extension type names starting with 'arrow.virtual.'"
5653 );
5654 }
5655
5656 #[test]
5657 fn test_row_numbers_with_multiple_row_groups() {
5658 test_row_numbers_with_multiple_row_groups_helper(
5659 false,
5660 |path, selection, _row_filter, batch_size| {
5661 let file = File::open(path).unwrap();
5662 let row_number_field = Arc::new(
5663 Field::new("row_number", ArrowDataType::Int64, false)
5664 .with_extension_type(RowNumber),
5665 );
5666 let options = ArrowReaderOptions::new()
5667 .with_virtual_columns(vec![row_number_field])
5668 .unwrap();
5669 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5670 .unwrap()
5671 .with_row_selection(selection)
5672 .with_batch_size(batch_size)
5673 .build()
5674 .expect("Could not create reader");
5675 reader
5676 .collect::<Result<Vec<_>, _>>()
5677 .expect("Could not read")
5678 },
5679 );
5680 }
5681
5682 #[test]
5683 fn test_row_numbers_with_multiple_row_groups_and_filter() {
5684 test_row_numbers_with_multiple_row_groups_helper(
5685 true,
5686 |path, selection, row_filter, batch_size| {
5687 let file = File::open(path).unwrap();
5688 let row_number_field = Arc::new(
5689 Field::new("row_number", ArrowDataType::Int64, false)
5690 .with_extension_type(RowNumber),
5691 );
5692 let options = ArrowReaderOptions::new()
5693 .with_virtual_columns(vec![row_number_field])
5694 .unwrap();
5695 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
5696 .unwrap()
5697 .with_row_selection(selection)
5698 .with_batch_size(batch_size)
5699 .with_row_filter(row_filter.expect("No filter"))
5700 .build()
5701 .expect("Could not create reader");
5702 reader
5703 .collect::<Result<Vec<_>, _>>()
5704 .expect("Could not read")
5705 },
5706 );
5707 }
5708
5709 pub(crate) fn test_row_numbers_with_multiple_row_groups_helper<F>(
5710 use_filter: bool,
5711 test_case: F,
5712 ) where
5713 F: FnOnce(PathBuf, RowSelection, Option<RowFilter>, usize) -> Vec<RecordBatch>,
5714 {
5715 let seed: u64 = random();
5716 println!("test_row_numbers_with_multiple_row_groups seed: {}", seed);
5717 let mut rng = StdRng::seed_from_u64(seed);
5718
5719 use tempfile::TempDir;
5720 let tempdir = TempDir::new().expect("Could not create temp dir");
5721
5722 let (bytes, metadata) = generate_file_with_row_numbers(&mut rng);
5723
5724 let path = tempdir.path().join("test.parquet");
5725 std::fs::write(&path, bytes).expect("Could not write file");
5726
5727 let mut case = vec![];
5728 let mut remaining = metadata.file_metadata().num_rows();
5729 while remaining > 0 {
5730 let row_count = rng.random_range(1..=remaining);
5731 remaining -= row_count;
5732 case.push(RowSelector {
5733 row_count: row_count as usize,
5734 skip: rng.random_bool(0.5),
5735 });
5736 }
5737
5738 let filter = use_filter.then(|| {
5739 let filter = (0..metadata.file_metadata().num_rows())
5740 .map(|_| rng.random_bool(0.99))
5741 .collect::<Vec<_>>();
5742 let mut filter_offset = 0;
5743 RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
5744 ProjectionMask::all(),
5745 move |b| {
5746 let array = BooleanArray::from_iter(
5747 filter
5748 .iter()
5749 .skip(filter_offset)
5750 .take(b.num_rows())
5751 .map(|x| Some(*x)),
5752 );
5753 filter_offset += b.num_rows();
5754 Ok(array)
5755 },
5756 ))])
5757 });
5758
5759 let selection = RowSelection::from(case);
5760 let batches = test_case(path, selection.clone(), filter, rng.random_range(1..4096));
5761
5762 if selection.skipped_row_count() == metadata.file_metadata().num_rows() as usize {
5763 assert!(batches.into_iter().all(|batch| batch.num_rows() == 0));
5764 return;
5765 }
5766 let actual = concat_batches(batches.first().expect("No batches").schema_ref(), &batches)
5767 .expect("Failed to concatenate");
5768 let values = actual
5770 .column(0)
5771 .as_primitive::<types::Int64Type>()
5772 .iter()
5773 .collect::<Vec<_>>();
5774 let row_numbers = actual
5775 .column(1)
5776 .as_primitive::<types::Int64Type>()
5777 .iter()
5778 .collect::<Vec<_>>();
5779 assert_eq!(
5780 row_numbers
5781 .into_iter()
5782 .map(|number| number.map(|number| number + 1))
5783 .collect::<Vec<_>>(),
5784 values
5785 );
5786 }
5787
5788 fn generate_file_with_row_numbers(rng: &mut impl Rng) -> (Bytes, ParquetMetaData) {
5789 let schema = Arc::new(Schema::new(Fields::from(vec![Field::new(
5790 "value",
5791 ArrowDataType::Int64,
5792 false,
5793 )])));
5794
5795 let mut buf = Vec::with_capacity(1024);
5796 let mut writer =
5797 ArrowWriter::try_new(&mut buf, schema.clone(), None).expect("Could not create writer");
5798
5799 let mut values = 1..=rng.random_range(1..4096);
5800 while !values.is_empty() {
5801 let batch_values = values
5802 .by_ref()
5803 .take(rng.random_range(1..4096))
5804 .collect::<Vec<_>>();
5805 let array = Arc::new(Int64Array::from(batch_values)) as ArrayRef;
5806 let batch =
5807 RecordBatch::try_from_iter([("value", array)]).expect("Could not create batch");
5808 writer.write(&batch).expect("Could not write batch");
5809 writer.flush().expect("Could not flush");
5810 }
5811 let metadata = writer.close().expect("Could not close writer");
5812
5813 (Bytes::from(buf), metadata)
5814 }
5815}