1use std::collections::VecDeque;
21use std::sync::Arc;
22
23use arrow_array::cast::AsArray;
24use arrow_array::Array;
25use arrow_array::{RecordBatch, RecordBatchReader};
26use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
27use arrow_select::filter::prep_null_mask_filter;
28pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
29pub use selection::{RowSelection, RowSelector};
30
31pub use crate::arrow::array_reader::RowGroups;
32use crate::arrow::array_reader::{build_array_reader, ArrayReader};
33use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
34use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
35use crate::column::page::{PageIterator, PageReader};
36use crate::errors::{ParquetError, Result};
37use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
38use crate::file::reader::{ChunkReader, SerializedPageReader};
39use crate::schema::types::SchemaDescriptor;
40
41mod filter;
42mod selection;
43pub mod statistics;
44
45pub struct ArrowReaderBuilder<T> {
54 pub(crate) input: T,
55
56 pub(crate) metadata: Arc<ParquetMetaData>,
57
58 pub(crate) schema: SchemaRef,
59
60 pub(crate) fields: Option<Arc<ParquetField>>,
61
62 pub(crate) batch_size: usize,
63
64 pub(crate) row_groups: Option<Vec<usize>>,
65
66 pub(crate) projection: ProjectionMask,
67
68 pub(crate) filter: Option<RowFilter>,
69
70 pub(crate) selection: Option<RowSelection>,
71
72 pub(crate) limit: Option<usize>,
73
74 pub(crate) offset: Option<usize>,
75}
76
77impl<T> ArrowReaderBuilder<T> {
78 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
79 Self {
80 input,
81 metadata: metadata.metadata,
82 schema: metadata.schema,
83 fields: metadata.fields,
84 batch_size: 1024,
85 row_groups: None,
86 projection: ProjectionMask::all(),
87 filter: None,
88 selection: None,
89 limit: None,
90 offset: None,
91 }
92 }
93
94 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
96 &self.metadata
97 }
98
99 pub fn parquet_schema(&self) -> &SchemaDescriptor {
101 self.metadata.file_metadata().schema_descr()
102 }
103
104 pub fn schema(&self) -> &SchemaRef {
106 &self.schema
107 }
108
109 pub fn with_batch_size(self, batch_size: usize) -> Self {
112 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
114 Self { batch_size, ..self }
115 }
116
117 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
121 Self {
122 row_groups: Some(row_groups),
123 ..self
124 }
125 }
126
127 pub fn with_projection(self, mask: ProjectionMask) -> Self {
129 Self {
130 projection: mask,
131 ..self
132 }
133 }
134
135 pub fn with_row_selection(self, selection: RowSelection) -> Self {
195 Self {
196 selection: Some(selection),
197 ..self
198 }
199 }
200
201 pub fn with_row_filter(self, filter: RowFilter) -> Self {
208 Self {
209 filter: Some(filter),
210 ..self
211 }
212 }
213
214 pub fn with_limit(self, limit: usize) -> Self {
222 Self {
223 limit: Some(limit),
224 ..self
225 }
226 }
227
228 pub fn with_offset(self, offset: usize) -> Self {
236 Self {
237 offset: Some(offset),
238 ..self
239 }
240 }
241}
242
243#[derive(Debug, Clone, Default)]
248pub struct ArrowReaderOptions {
249 skip_arrow_metadata: bool,
251 supplied_schema: Option<SchemaRef>,
253 pub(crate) page_index: bool,
255}
256
257impl ArrowReaderOptions {
258 pub fn new() -> Self {
260 Self::default()
261 }
262
263 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
270 Self {
271 skip_arrow_metadata,
272 ..self
273 }
274 }
275
276 pub fn with_schema(self, schema: SchemaRef) -> Self {
323 Self {
324 supplied_schema: Some(schema),
325 skip_arrow_metadata: true,
326 ..self
327 }
328 }
329
330 pub fn with_page_index(self, page_index: bool) -> Self {
343 Self { page_index, ..self }
344 }
345}
346
347#[derive(Debug, Clone)]
362pub struct ArrowReaderMetadata {
363 pub(crate) metadata: Arc<ParquetMetaData>,
365 pub(crate) schema: SchemaRef,
367
368 pub(crate) fields: Option<Arc<ParquetField>>,
369}
370
371impl ArrowReaderMetadata {
372 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
383 let metadata = ParquetMetaDataReader::new()
384 .with_page_indexes(options.page_index)
385 .parse_and_finish(reader)?;
386 Self::try_new(Arc::new(metadata), options)
387 }
388
389 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
396 match options.supplied_schema {
397 Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
398 None => {
399 let kv_metadata = match options.skip_arrow_metadata {
400 true => None,
401 false => metadata.file_metadata().key_value_metadata(),
402 };
403
404 let (schema, fields) = parquet_to_arrow_schema_and_fields(
405 metadata.file_metadata().schema_descr(),
406 ProjectionMask::all(),
407 kv_metadata,
408 )?;
409
410 Ok(Self {
411 metadata,
412 schema: Arc::new(schema),
413 fields: fields.map(Arc::new),
414 })
415 }
416 }
417 }
418
419 fn with_supplied_schema(
420 metadata: Arc<ParquetMetaData>,
421 supplied_schema: SchemaRef,
422 ) -> Result<Self> {
423 let parquet_schema = metadata.file_metadata().schema_descr();
424 let field_levels = parquet_to_arrow_field_levels(
425 parquet_schema,
426 ProjectionMask::all(),
427 Some(supplied_schema.fields()),
428 )?;
429 let fields = field_levels.fields;
430 let inferred_len = fields.len();
431 let supplied_len = supplied_schema.fields().len();
432 if inferred_len != supplied_len {
436 Err(arrow_err!(format!(
437 "incompatible arrow schema, expected {} columns received {}",
438 inferred_len, supplied_len
439 )))
440 } else {
441 let diff_fields: Vec<_> = supplied_schema
442 .fields()
443 .iter()
444 .zip(fields.iter())
445 .filter_map(|(field1, field2)| {
446 if field1 != field2 {
447 Some(field1.name().clone())
448 } else {
449 None
450 }
451 })
452 .collect();
453
454 if !diff_fields.is_empty() {
455 Err(ParquetError::ArrowError(format!(
456 "incompatible arrow schema, the following fields could not be cast: [{}]",
457 diff_fields.join(", ")
458 )))
459 } else {
460 Ok(Self {
461 metadata,
462 schema: supplied_schema,
463 fields: field_levels.levels.map(Arc::new),
464 })
465 }
466 }
467 }
468
469 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
471 &self.metadata
472 }
473
474 pub fn parquet_schema(&self) -> &SchemaDescriptor {
476 self.metadata.file_metadata().schema_descr()
477 }
478
479 pub fn schema(&self) -> &SchemaRef {
481 &self.schema
482 }
483}
484
485#[doc(hidden)]
486pub struct SyncReader<T: ChunkReader>(T);
488
489pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
495
496impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
497 pub fn try_new(reader: T) -> Result<Self> {
526 Self::try_new_with_options(reader, Default::default())
527 }
528
529 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
531 let metadata = ArrowReaderMetadata::load(&reader, options)?;
532 Ok(Self::new_with_metadata(reader, metadata))
533 }
534
535 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
572 Self::new_builder(SyncReader(input), metadata)
573 }
574
575 pub fn build(self) -> Result<ParquetRecordBatchReader> {
579 let batch_size = self
581 .batch_size
582 .min(self.metadata.file_metadata().num_rows() as usize);
583
584 let row_groups = self
585 .row_groups
586 .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
587
588 let reader = ReaderRowGroups {
589 reader: Arc::new(self.input.0),
590 metadata: self.metadata,
591 row_groups,
592 };
593
594 let mut filter = self.filter;
595 let mut selection = self.selection;
596
597 if let Some(filter) = filter.as_mut() {
598 for predicate in filter.predicates.iter_mut() {
599 if !selects_any(selection.as_ref()) {
600 break;
601 }
602
603 let array_reader =
604 build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
605
606 selection = Some(evaluate_predicate(
607 batch_size,
608 array_reader,
609 selection,
610 predicate.as_mut(),
611 )?);
612 }
613 }
614
615 let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
616
617 if !selects_any(selection.as_ref()) {
619 selection = Some(RowSelection::from(vec![]));
620 }
621
622 Ok(ParquetRecordBatchReader::new(
623 batch_size,
624 array_reader,
625 apply_range(selection, reader.num_rows(), self.offset, self.limit),
626 ))
627 }
628}
629
630struct ReaderRowGroups<T: ChunkReader> {
631 reader: Arc<T>,
632
633 metadata: Arc<ParquetMetaData>,
634 row_groups: Vec<usize>,
636}
637
638impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
639 fn num_rows(&self) -> usize {
640 let meta = self.metadata.row_groups();
641 self.row_groups
642 .iter()
643 .map(|x| meta[*x].num_rows() as usize)
644 .sum()
645 }
646
647 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
648 Ok(Box::new(ReaderPageIterator {
649 column_idx: i,
650 reader: self.reader.clone(),
651 metadata: self.metadata.clone(),
652 row_groups: self.row_groups.clone().into_iter(),
653 }))
654 }
655}
656
657struct ReaderPageIterator<T: ChunkReader> {
658 reader: Arc<T>,
659 column_idx: usize,
660 row_groups: std::vec::IntoIter<usize>,
661 metadata: Arc<ParquetMetaData>,
662}
663
664impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
665 type Item = Result<Box<dyn PageReader>>;
666
667 fn next(&mut self) -> Option<Self::Item> {
668 let rg_idx = self.row_groups.next()?;
669 let rg = self.metadata.row_group(rg_idx);
670 let meta = rg.column(self.column_idx);
671 let offset_index = self.metadata.offset_index();
672 let page_locations = offset_index
675 .filter(|i| !i[rg_idx].is_empty())
676 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
677 let total_rows = rg.num_rows() as usize;
678 let reader = self.reader.clone();
679
680 let ret = SerializedPageReader::new(reader, meta, total_rows, page_locations);
681 Some(ret.map(|x| Box::new(x) as _))
682 }
683}
684
685impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
686
687pub struct ParquetRecordBatchReader {
690 batch_size: usize,
691 array_reader: Box<dyn ArrayReader>,
692 schema: SchemaRef,
693 selection: Option<VecDeque<RowSelector>>,
694}
695
696impl Iterator for ParquetRecordBatchReader {
697 type Item = Result<RecordBatch, ArrowError>;
698
699 fn next(&mut self) -> Option<Self::Item> {
700 let mut read_records = 0;
701 match self.selection.as_mut() {
702 Some(selection) => {
703 while read_records < self.batch_size && !selection.is_empty() {
704 let front = selection.pop_front().unwrap();
705 if front.skip {
706 let skipped = match self.array_reader.skip_records(front.row_count) {
707 Ok(skipped) => skipped,
708 Err(e) => return Some(Err(e.into())),
709 };
710
711 if skipped != front.row_count {
712 return Some(Err(general_err!(
713 "failed to skip rows, expected {}, got {}",
714 front.row_count,
715 skipped
716 )
717 .into()));
718 }
719 continue;
720 }
721
722 if front.row_count == 0 {
725 continue;
726 }
727
728 let need_read = self.batch_size - read_records;
730 let to_read = match front.row_count.checked_sub(need_read) {
731 Some(remaining) if remaining != 0 => {
732 selection.push_front(RowSelector::select(remaining));
735 need_read
736 }
737 _ => front.row_count,
738 };
739 match self.array_reader.read_records(to_read) {
740 Ok(0) => break,
741 Ok(rec) => read_records += rec,
742 Err(error) => return Some(Err(error.into())),
743 }
744 }
745 }
746 None => {
747 if let Err(error) = self.array_reader.read_records(self.batch_size) {
748 return Some(Err(error.into()));
749 }
750 }
751 };
752
753 match self.array_reader.consume_batch() {
754 Err(error) => Some(Err(error.into())),
755 Ok(array) => {
756 let struct_array = array.as_struct_opt().ok_or_else(|| {
757 ArrowError::ParquetError(
758 "Struct array reader should return struct array".to_string(),
759 )
760 });
761
762 match struct_array {
763 Err(err) => Some(Err(err)),
764 Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
765 }
766 }
767 }
768 }
769}
770
771impl RecordBatchReader for ParquetRecordBatchReader {
772 fn schema(&self) -> SchemaRef {
777 self.schema.clone()
778 }
779}
780
781impl ParquetRecordBatchReader {
782 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
786 ParquetRecordBatchReaderBuilder::try_new(reader)?
787 .with_batch_size(batch_size)
788 .build()
789 }
790
791 pub fn try_new_with_row_groups(
796 levels: &FieldLevels,
797 row_groups: &dyn RowGroups,
798 batch_size: usize,
799 selection: Option<RowSelection>,
800 ) -> Result<Self> {
801 let array_reader =
802 build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;
803
804 Ok(Self {
805 batch_size,
806 array_reader,
807 schema: Arc::new(Schema::new(levels.fields.clone())),
808 selection: selection.map(|s| s.trim().into()),
809 })
810 }
811
812 pub(crate) fn new(
816 batch_size: usize,
817 array_reader: Box<dyn ArrayReader>,
818 selection: Option<RowSelection>,
819 ) -> Self {
820 let schema = match array_reader.get_data_type() {
821 ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
822 _ => unreachable!("Struct array reader's data type is not struct!"),
823 };
824
825 Self {
826 batch_size,
827 array_reader,
828 schema: Arc::new(schema),
829 selection: selection.map(|s| s.trim().into()),
830 }
831 }
832}
833
834pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
836 selection.map(|x| x.selects_any()).unwrap_or(true)
837}
838
839pub(crate) fn apply_range(
841 mut selection: Option<RowSelection>,
842 row_count: usize,
843 offset: Option<usize>,
844 limit: Option<usize>,
845) -> Option<RowSelection> {
846 if let Some(offset) = offset {
848 selection = Some(match row_count.checked_sub(offset) {
849 None => RowSelection::from(vec![]),
850 Some(remaining) => selection
851 .map(|selection| selection.offset(offset))
852 .unwrap_or_else(|| {
853 RowSelection::from(vec![
854 RowSelector::skip(offset),
855 RowSelector::select(remaining),
856 ])
857 }),
858 });
859 }
860
861 if let Some(limit) = limit {
863 selection = Some(
864 selection
865 .map(|selection| selection.limit(limit))
866 .unwrap_or_else(|| {
867 RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
868 }),
869 );
870 }
871 selection
872}
873
874pub(crate) fn evaluate_predicate(
885 batch_size: usize,
886 array_reader: Box<dyn ArrayReader>,
887 input_selection: Option<RowSelection>,
888 predicate: &mut dyn ArrowPredicate,
889) -> Result<RowSelection> {
890 let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
891 let mut filters = vec![];
892 for maybe_batch in reader {
893 let maybe_batch = maybe_batch?;
894 let input_rows = maybe_batch.num_rows();
895 let filter = predicate.evaluate(maybe_batch)?;
896 if filter.len() != input_rows {
898 return Err(arrow_err!(
899 "ArrowPredicate predicate returned {} rows, expected {input_rows}",
900 filter.len()
901 ));
902 }
903 match filter.null_count() {
904 0 => filters.push(filter),
905 _ => filters.push(prep_null_mask_filter(&filter)),
906 };
907 }
908
909 let raw = RowSelection::from_filters(&filters);
910 Ok(match input_selection {
911 Some(selection) => selection.and_then(&raw),
912 None => raw,
913 })
914}
915
916#[cfg(test)]
917mod tests {
918 use std::cmp::min;
919 use std::collections::{HashMap, VecDeque};
920 use std::fmt::Formatter;
921 use std::fs::File;
922 use std::io::Seek;
923 use std::path::PathBuf;
924 use std::sync::Arc;
925
926 use bytes::Bytes;
927 use half::f16;
928 use num::PrimInt;
929 use rand::{thread_rng, Rng, RngCore};
930 use tempfile::tempfile;
931
932 use arrow_array::builder::*;
933 use arrow_array::cast::AsArray;
934 use arrow_array::types::{
935 Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
936 Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
937 };
938 use arrow_array::*;
939 use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
940 use arrow_data::{ArrayData, ArrayDataBuilder};
941 use arrow_schema::{
942 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
943 };
944 use arrow_select::concat::concat_batches;
945
946 use crate::arrow::arrow_reader::{
947 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
948 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
949 };
950 use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
951 use crate::arrow::{ArrowWriter, ProjectionMask};
952 use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
953 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
954 use crate::data_type::{
955 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
956 FloatType, Int32Type, Int64Type, Int96Type,
957 };
958 use crate::errors::Result;
959 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
960 use crate::file::writer::SerializedFileWriter;
961 use crate::schema::parser::parse_message_type;
962 use crate::schema::types::{Type, TypePtr};
963 use crate::util::test_common::rand_gen::RandGen;
964
965 #[test]
966 fn test_arrow_reader_all_columns() {
967 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
968
969 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
970 let original_schema = Arc::clone(builder.schema());
971 let reader = builder.build().unwrap();
972
973 assert_eq!(original_schema.fields(), reader.schema().fields());
975 }
976
977 #[test]
978 fn test_arrow_reader_single_column() {
979 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
980
981 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
982 let original_schema = Arc::clone(builder.schema());
983
984 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
985 let reader = builder.with_projection(mask).build().unwrap();
986
987 assert_eq!(1, reader.schema().fields().len());
989 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
990 }
991
992 #[test]
993 fn test_arrow_reader_single_column_by_name() {
994 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
995
996 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
997 let original_schema = Arc::clone(builder.schema());
998
999 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1000 let reader = builder.with_projection(mask).build().unwrap();
1001
1002 assert_eq!(1, reader.schema().fields().len());
1004 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1005 }
1006
1007 #[test]
1008 fn test_null_column_reader_test() {
1009 let mut file = tempfile::tempfile().unwrap();
1010
1011 let schema = "
1012 message message {
1013 OPTIONAL INT32 int32;
1014 }
1015 ";
1016 let schema = Arc::new(parse_message_type(schema).unwrap());
1017
1018 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1019 generate_single_column_file_with_data::<Int32Type>(
1020 &[vec![], vec![]],
1021 Some(&def_levels),
1022 file.try_clone().unwrap(), schema,
1024 Some(Field::new("int32", ArrowDataType::Null, true)),
1025 &Default::default(),
1026 )
1027 .unwrap();
1028
1029 file.rewind().unwrap();
1030
1031 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1032 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1033
1034 assert_eq!(batches.len(), 4);
1035 for batch in &batches[0..3] {
1036 assert_eq!(batch.num_rows(), 2);
1037 assert_eq!(batch.num_columns(), 1);
1038 assert_eq!(batch.column(0).null_count(), 2);
1039 }
1040
1041 assert_eq!(batches[3].num_rows(), 1);
1042 assert_eq!(batches[3].num_columns(), 1);
1043 assert_eq!(batches[3].column(0).null_count(), 1);
1044 }
1045
1046 #[test]
1047 fn test_primitive_single_column_reader_test() {
1048 run_single_column_reader_tests::<BoolType, _, BoolType>(
1049 2,
1050 ConvertedType::NONE,
1051 None,
1052 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1053 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1054 );
1055 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1056 2,
1057 ConvertedType::NONE,
1058 None,
1059 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1060 &[
1061 Encoding::PLAIN,
1062 Encoding::RLE_DICTIONARY,
1063 Encoding::DELTA_BINARY_PACKED,
1064 Encoding::BYTE_STREAM_SPLIT,
1065 ],
1066 );
1067 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1068 2,
1069 ConvertedType::NONE,
1070 None,
1071 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1072 &[
1073 Encoding::PLAIN,
1074 Encoding::RLE_DICTIONARY,
1075 Encoding::DELTA_BINARY_PACKED,
1076 Encoding::BYTE_STREAM_SPLIT,
1077 ],
1078 );
1079 run_single_column_reader_tests::<FloatType, _, FloatType>(
1080 2,
1081 ConvertedType::NONE,
1082 None,
1083 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1084 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1085 );
1086 }
1087
1088 #[test]
1089 fn test_unsigned_primitive_single_column_reader_test() {
1090 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1091 2,
1092 ConvertedType::UINT_32,
1093 Some(ArrowDataType::UInt32),
1094 |vals| {
1095 Arc::new(UInt32Array::from_iter(
1096 vals.iter().map(|x| x.map(|x| x as u32)),
1097 ))
1098 },
1099 &[
1100 Encoding::PLAIN,
1101 Encoding::RLE_DICTIONARY,
1102 Encoding::DELTA_BINARY_PACKED,
1103 ],
1104 );
1105 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1106 2,
1107 ConvertedType::UINT_64,
1108 Some(ArrowDataType::UInt64),
1109 |vals| {
1110 Arc::new(UInt64Array::from_iter(
1111 vals.iter().map(|x| x.map(|x| x as u64)),
1112 ))
1113 },
1114 &[
1115 Encoding::PLAIN,
1116 Encoding::RLE_DICTIONARY,
1117 Encoding::DELTA_BINARY_PACKED,
1118 ],
1119 );
1120 }
1121
1122 #[test]
1123 fn test_unsigned_roundtrip() {
1124 let schema = Arc::new(Schema::new(vec![
1125 Field::new("uint32", ArrowDataType::UInt32, true),
1126 Field::new("uint64", ArrowDataType::UInt64, true),
1127 ]));
1128
1129 let mut buf = Vec::with_capacity(1024);
1130 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1131
1132 let original = RecordBatch::try_new(
1133 schema,
1134 vec![
1135 Arc::new(UInt32Array::from_iter_values([
1136 0,
1137 i32::MAX as u32,
1138 u32::MAX,
1139 ])),
1140 Arc::new(UInt64Array::from_iter_values([
1141 0,
1142 i64::MAX as u64,
1143 u64::MAX,
1144 ])),
1145 ],
1146 )
1147 .unwrap();
1148
1149 writer.write(&original).unwrap();
1150 writer.close().unwrap();
1151
1152 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1153 let ret = reader.next().unwrap().unwrap();
1154 assert_eq!(ret, original);
1155
1156 ret.column(0)
1158 .as_any()
1159 .downcast_ref::<UInt32Array>()
1160 .unwrap();
1161
1162 ret.column(1)
1163 .as_any()
1164 .downcast_ref::<UInt64Array>()
1165 .unwrap();
1166 }
1167
1168 #[test]
1169 fn test_float16_roundtrip() -> Result<()> {
1170 let schema = Arc::new(Schema::new(vec![
1171 Field::new("float16", ArrowDataType::Float16, false),
1172 Field::new("float16-nullable", ArrowDataType::Float16, true),
1173 ]));
1174
1175 let mut buf = Vec::with_capacity(1024);
1176 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1177
1178 let original = RecordBatch::try_new(
1179 schema,
1180 vec![
1181 Arc::new(Float16Array::from_iter_values([
1182 f16::EPSILON,
1183 f16::MIN,
1184 f16::MAX,
1185 f16::NAN,
1186 f16::INFINITY,
1187 f16::NEG_INFINITY,
1188 f16::ONE,
1189 f16::NEG_ONE,
1190 f16::ZERO,
1191 f16::NEG_ZERO,
1192 f16::E,
1193 f16::PI,
1194 f16::FRAC_1_PI,
1195 ])),
1196 Arc::new(Float16Array::from(vec![
1197 None,
1198 None,
1199 None,
1200 Some(f16::NAN),
1201 Some(f16::INFINITY),
1202 Some(f16::NEG_INFINITY),
1203 None,
1204 None,
1205 None,
1206 None,
1207 None,
1208 None,
1209 Some(f16::FRAC_1_PI),
1210 ])),
1211 ],
1212 )?;
1213
1214 writer.write(&original)?;
1215 writer.close()?;
1216
1217 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1218 let ret = reader.next().unwrap()?;
1219 assert_eq!(ret, original);
1220
1221 ret.column(0).as_primitive::<Float16Type>();
1223 ret.column(1).as_primitive::<Float16Type>();
1224
1225 Ok(())
1226 }
1227
1228 #[test]
1229 fn test_time_utc_roundtrip() -> Result<()> {
1230 let schema = Arc::new(Schema::new(vec![
1231 Field::new(
1232 "time_millis",
1233 ArrowDataType::Time32(TimeUnit::Millisecond),
1234 true,
1235 )
1236 .with_metadata(HashMap::from_iter(vec![(
1237 "adjusted_to_utc".to_string(),
1238 "".to_string(),
1239 )])),
1240 Field::new(
1241 "time_micros",
1242 ArrowDataType::Time64(TimeUnit::Microsecond),
1243 true,
1244 )
1245 .with_metadata(HashMap::from_iter(vec![(
1246 "adjusted_to_utc".to_string(),
1247 "".to_string(),
1248 )])),
1249 ]));
1250
1251 let mut buf = Vec::with_capacity(1024);
1252 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1253
1254 let original = RecordBatch::try_new(
1255 schema,
1256 vec![
1257 Arc::new(Time32MillisecondArray::from(vec![
1258 Some(-1),
1259 Some(0),
1260 Some(86_399_000),
1261 Some(86_400_000),
1262 Some(86_401_000),
1263 None,
1264 ])),
1265 Arc::new(Time64MicrosecondArray::from(vec![
1266 Some(-1),
1267 Some(0),
1268 Some(86_399 * 1_000_000),
1269 Some(86_400 * 1_000_000),
1270 Some(86_401 * 1_000_000),
1271 None,
1272 ])),
1273 ],
1274 )?;
1275
1276 writer.write(&original)?;
1277 writer.close()?;
1278
1279 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1280 let ret = reader.next().unwrap()?;
1281 assert_eq!(ret, original);
1282
1283 ret.column(0).as_primitive::<Time32MillisecondType>();
1285 ret.column(1).as_primitive::<Time64MicrosecondType>();
1286
1287 Ok(())
1288 }
1289
1290 #[test]
1291 fn test_date32_roundtrip() -> Result<()> {
1292 use arrow_array::Date32Array;
1293
1294 let schema = Arc::new(Schema::new(vec![Field::new(
1295 "date32",
1296 ArrowDataType::Date32,
1297 false,
1298 )]));
1299
1300 let mut buf = Vec::with_capacity(1024);
1301
1302 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1303
1304 let original = RecordBatch::try_new(
1305 schema,
1306 vec![Arc::new(Date32Array::from(vec![
1307 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1308 ]))],
1309 )?;
1310
1311 writer.write(&original)?;
1312 writer.close()?;
1313
1314 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1315 let ret = reader.next().unwrap()?;
1316 assert_eq!(ret, original);
1317
1318 ret.column(0).as_primitive::<Date32Type>();
1320
1321 Ok(())
1322 }
1323
1324 #[test]
1325 fn test_date64_roundtrip() -> Result<()> {
1326 use arrow_array::Date64Array;
1327
1328 let schema = Arc::new(Schema::new(vec![
1329 Field::new("small-date64", ArrowDataType::Date64, false),
1330 Field::new("big-date64", ArrowDataType::Date64, false),
1331 Field::new("invalid-date64", ArrowDataType::Date64, false),
1332 ]));
1333
1334 let mut default_buf = Vec::with_capacity(1024);
1335 let mut coerce_buf = Vec::with_capacity(1024);
1336
1337 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1338
1339 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1340 let mut coerce_writer =
1341 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1342
1343 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1344
1345 let original = RecordBatch::try_new(
1346 schema,
1347 vec![
1348 Arc::new(Date64Array::from(vec![
1350 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1351 -1_000 * NUM_MILLISECONDS_IN_DAY,
1352 0,
1353 1_000 * NUM_MILLISECONDS_IN_DAY,
1354 1_000_000 * NUM_MILLISECONDS_IN_DAY,
1355 ])),
1356 Arc::new(Date64Array::from(vec![
1358 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1359 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1360 0,
1361 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1362 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1363 ])),
1364 Arc::new(Date64Array::from(vec![
1366 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1367 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1368 1,
1369 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1370 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1371 ])),
1372 ],
1373 )?;
1374
1375 default_writer.write(&original)?;
1376 coerce_writer.write(&original)?;
1377
1378 default_writer.close()?;
1379 coerce_writer.close()?;
1380
1381 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1382 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1383
1384 let default_ret = default_reader.next().unwrap()?;
1385 let coerce_ret = coerce_reader.next().unwrap()?;
1386
1387 assert_eq!(default_ret, original);
1389
1390 assert_eq!(coerce_ret.column(0), original.column(0));
1392 assert_ne!(coerce_ret.column(1), original.column(1));
1393 assert_ne!(coerce_ret.column(2), original.column(2));
1394
1395 default_ret.column(0).as_primitive::<Date64Type>();
1397 coerce_ret.column(0).as_primitive::<Date64Type>();
1398
1399 Ok(())
1400 }
1401 struct RandFixedLenGen {}
1402
1403 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1404 fn gen(len: i32) -> FixedLenByteArray {
1405 let mut v = vec![0u8; len as usize];
1406 thread_rng().fill_bytes(&mut v);
1407 ByteArray::from(v).into()
1408 }
1409 }
1410
1411 #[test]
1412 fn test_fixed_length_binary_column_reader() {
1413 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1414 20,
1415 ConvertedType::NONE,
1416 None,
1417 |vals| {
1418 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1419 for val in vals {
1420 match val {
1421 Some(b) => builder.append_value(b).unwrap(),
1422 None => builder.append_null(),
1423 }
1424 }
1425 Arc::new(builder.finish())
1426 },
1427 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1428 );
1429 }
1430
1431 #[test]
1432 fn test_interval_day_time_column_reader() {
1433 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1434 12,
1435 ConvertedType::INTERVAL,
1436 None,
1437 |vals| {
1438 Arc::new(
1439 vals.iter()
1440 .map(|x| {
1441 x.as_ref().map(|b| IntervalDayTime {
1442 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1443 milliseconds: i32::from_le_bytes(
1444 b.as_ref()[8..12].try_into().unwrap(),
1445 ),
1446 })
1447 })
1448 .collect::<IntervalDayTimeArray>(),
1449 )
1450 },
1451 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1452 );
1453 }
1454
1455 #[test]
1456 fn test_int96_single_column_reader_test() {
1457 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1458 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1459 2,
1460 ConvertedType::NONE,
1461 None,
1462 |vals| {
1463 Arc::new(TimestampNanosecondArray::from_iter(
1464 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1465 )) as _
1466 },
1467 encodings,
1468 );
1469 }
1470
1471 struct RandUtf8Gen {}
1472
1473 impl RandGen<ByteArrayType> for RandUtf8Gen {
1474 fn gen(len: i32) -> ByteArray {
1475 Int32Type::gen(len).to_string().as_str().into()
1476 }
1477 }
1478
1479 #[test]
1480 fn test_utf8_single_column_reader_test() {
1481 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1482 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1483 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1484 })))
1485 }
1486
1487 let encodings = &[
1488 Encoding::PLAIN,
1489 Encoding::RLE_DICTIONARY,
1490 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1491 Encoding::DELTA_BYTE_ARRAY,
1492 ];
1493
1494 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1495 2,
1496 ConvertedType::NONE,
1497 None,
1498 |vals| {
1499 Arc::new(BinaryArray::from_iter(
1500 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1501 ))
1502 },
1503 encodings,
1504 );
1505
1506 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1507 2,
1508 ConvertedType::UTF8,
1509 None,
1510 string_converter::<i32>,
1511 encodings,
1512 );
1513
1514 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1515 2,
1516 ConvertedType::UTF8,
1517 Some(ArrowDataType::Utf8),
1518 string_converter::<i32>,
1519 encodings,
1520 );
1521
1522 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1523 2,
1524 ConvertedType::UTF8,
1525 Some(ArrowDataType::LargeUtf8),
1526 string_converter::<i64>,
1527 encodings,
1528 );
1529
1530 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1531 for key in &small_key_types {
1532 for encoding in encodings {
1533 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1534 opts.encoding = *encoding;
1535
1536 let data_type =
1537 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1538
1539 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1541 opts,
1542 2,
1543 ConvertedType::UTF8,
1544 Some(data_type.clone()),
1545 move |vals| {
1546 let vals = string_converter::<i32>(vals);
1547 arrow::compute::cast(&vals, &data_type).unwrap()
1548 },
1549 );
1550 }
1551 }
1552
1553 let key_types = [
1554 ArrowDataType::Int16,
1555 ArrowDataType::UInt16,
1556 ArrowDataType::Int32,
1557 ArrowDataType::UInt32,
1558 ArrowDataType::Int64,
1559 ArrowDataType::UInt64,
1560 ];
1561
1562 for key in &key_types {
1563 let data_type =
1564 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1565
1566 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1567 2,
1568 ConvertedType::UTF8,
1569 Some(data_type.clone()),
1570 move |vals| {
1571 let vals = string_converter::<i32>(vals);
1572 arrow::compute::cast(&vals, &data_type).unwrap()
1573 },
1574 encodings,
1575 );
1576
1577 }
1594 }
1595
1596 #[test]
1597 fn test_decimal_nullable_struct() {
1598 let decimals = Decimal256Array::from_iter_values(
1599 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1600 );
1601
1602 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1603 "decimals",
1604 decimals.data_type().clone(),
1605 false,
1606 )])))
1607 .len(8)
1608 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1609 .child_data(vec![decimals.into_data()])
1610 .build()
1611 .unwrap();
1612
1613 let written =
1614 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1615 .unwrap();
1616
1617 let mut buffer = Vec::with_capacity(1024);
1618 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1619 writer.write(&written).unwrap();
1620 writer.close().unwrap();
1621
1622 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1623 .unwrap()
1624 .collect::<Result<Vec<_>, _>>()
1625 .unwrap();
1626
1627 assert_eq!(&written.slice(0, 3), &read[0]);
1628 assert_eq!(&written.slice(3, 3), &read[1]);
1629 assert_eq!(&written.slice(6, 2), &read[2]);
1630 }
1631
1632 #[test]
1633 fn test_int32_nullable_struct() {
1634 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1635 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1636 "int32",
1637 int32.data_type().clone(),
1638 false,
1639 )])))
1640 .len(8)
1641 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1642 .child_data(vec![int32.into_data()])
1643 .build()
1644 .unwrap();
1645
1646 let written =
1647 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1648 .unwrap();
1649
1650 let mut buffer = Vec::with_capacity(1024);
1651 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1652 writer.write(&written).unwrap();
1653 writer.close().unwrap();
1654
1655 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1656 .unwrap()
1657 .collect::<Result<Vec<_>, _>>()
1658 .unwrap();
1659
1660 assert_eq!(&written.slice(0, 3), &read[0]);
1661 assert_eq!(&written.slice(3, 3), &read[1]);
1662 assert_eq!(&written.slice(6, 2), &read[2]);
1663 }
1664
1665 #[test]
1666 #[ignore] fn test_decimal_list() {
1668 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1669
1670 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
1672 decimals.data_type().clone(),
1673 false,
1674 ))))
1675 .len(7)
1676 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
1677 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
1678 .child_data(vec![decimals.into_data()])
1679 .build()
1680 .unwrap();
1681
1682 let written =
1683 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
1684 .unwrap();
1685
1686 let mut buffer = Vec::with_capacity(1024);
1687 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1688 writer.write(&written).unwrap();
1689 writer.close().unwrap();
1690
1691 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1692 .unwrap()
1693 .collect::<Result<Vec<_>, _>>()
1694 .unwrap();
1695
1696 assert_eq!(&written.slice(0, 3), &read[0]);
1697 assert_eq!(&written.slice(3, 3), &read[1]);
1698 assert_eq!(&written.slice(6, 1), &read[2]);
1699 }
1700
1701 #[test]
1702 fn test_read_decimal_file() {
1703 use arrow_array::Decimal128Array;
1704 let testdata = arrow::util::test_util::parquet_test_data();
1705 let file_variants = vec![
1706 ("byte_array", 4),
1707 ("fixed_length", 25),
1708 ("int32", 4),
1709 ("int64", 10),
1710 ];
1711 for (prefix, target_precision) in file_variants {
1712 let path = format!("{testdata}/{prefix}_decimal.parquet");
1713 let file = File::open(path).unwrap();
1714 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1715
1716 let batch = record_reader.next().unwrap().unwrap();
1717 assert_eq!(batch.num_rows(), 24);
1718 let col = batch
1719 .column(0)
1720 .as_any()
1721 .downcast_ref::<Decimal128Array>()
1722 .unwrap();
1723
1724 let expected = 1..25;
1725
1726 assert_eq!(col.precision(), target_precision);
1727 assert_eq!(col.scale(), 2);
1728
1729 for (i, v) in expected.enumerate() {
1730 assert_eq!(col.value(i), v * 100_i128);
1731 }
1732 }
1733 }
1734
1735 #[test]
1736 fn test_read_float16_nonzeros_file() {
1737 use arrow_array::Float16Array;
1738 let testdata = arrow::util::test_util::parquet_test_data();
1739 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
1741 let file = File::open(path).unwrap();
1742 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1743
1744 let batch = record_reader.next().unwrap().unwrap();
1745 assert_eq!(batch.num_rows(), 8);
1746 let col = batch
1747 .column(0)
1748 .as_any()
1749 .downcast_ref::<Float16Array>()
1750 .unwrap();
1751
1752 let f16_two = f16::ONE + f16::ONE;
1753
1754 assert_eq!(col.null_count(), 1);
1755 assert!(col.is_null(0));
1756 assert_eq!(col.value(1), f16::ONE);
1757 assert_eq!(col.value(2), -f16_two);
1758 assert!(col.value(3).is_nan());
1759 assert_eq!(col.value(4), f16::ZERO);
1760 assert!(col.value(4).is_sign_positive());
1761 assert_eq!(col.value(5), f16::NEG_ONE);
1762 assert_eq!(col.value(6), f16::NEG_ZERO);
1763 assert!(col.value(6).is_sign_negative());
1764 assert_eq!(col.value(7), f16_two);
1765 }
1766
1767 #[test]
1768 fn test_read_float16_zeros_file() {
1769 use arrow_array::Float16Array;
1770 let testdata = arrow::util::test_util::parquet_test_data();
1771 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
1773 let file = File::open(path).unwrap();
1774 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1775
1776 let batch = record_reader.next().unwrap().unwrap();
1777 assert_eq!(batch.num_rows(), 3);
1778 let col = batch
1779 .column(0)
1780 .as_any()
1781 .downcast_ref::<Float16Array>()
1782 .unwrap();
1783
1784 assert_eq!(col.null_count(), 1);
1785 assert!(col.is_null(0));
1786 assert_eq!(col.value(1), f16::ZERO);
1787 assert!(col.value(1).is_sign_positive());
1788 assert!(col.value(2).is_nan());
1789 }
1790
1791 #[test]
1792 fn test_read_float32_float64_byte_stream_split() {
1793 let path = format!(
1794 "{}/byte_stream_split.zstd.parquet",
1795 arrow::util::test_util::parquet_test_data(),
1796 );
1797 let file = File::open(path).unwrap();
1798 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
1799
1800 let mut row_count = 0;
1801 for batch in record_reader {
1802 let batch = batch.unwrap();
1803 row_count += batch.num_rows();
1804 let f32_col = batch.column(0).as_primitive::<Float32Type>();
1805 let f64_col = batch.column(1).as_primitive::<Float64Type>();
1806
1807 for &x in f32_col.values() {
1809 assert!(x > -10.0);
1810 assert!(x < 10.0);
1811 }
1812 for &x in f64_col.values() {
1813 assert!(x > -10.0);
1814 assert!(x < 10.0);
1815 }
1816 }
1817 assert_eq!(row_count, 300);
1818 }
1819
1820 #[test]
1821 fn test_read_extended_byte_stream_split() {
1822 let path = format!(
1823 "{}/byte_stream_split_extended.gzip.parquet",
1824 arrow::util::test_util::parquet_test_data(),
1825 );
1826 let file = File::open(path).unwrap();
1827 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
1828
1829 let mut row_count = 0;
1830 for batch in record_reader {
1831 let batch = batch.unwrap();
1832 row_count += batch.num_rows();
1833
1834 let f16_col = batch.column(0).as_primitive::<Float16Type>();
1836 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
1837 assert_eq!(f16_col.len(), f16_bss.len());
1838 f16_col
1839 .iter()
1840 .zip(f16_bss.iter())
1841 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1842
1843 let f32_col = batch.column(2).as_primitive::<Float32Type>();
1845 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
1846 assert_eq!(f32_col.len(), f32_bss.len());
1847 f32_col
1848 .iter()
1849 .zip(f32_bss.iter())
1850 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1851
1852 let f64_col = batch.column(4).as_primitive::<Float64Type>();
1854 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
1855 assert_eq!(f64_col.len(), f64_bss.len());
1856 f64_col
1857 .iter()
1858 .zip(f64_bss.iter())
1859 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1860
1861 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
1863 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
1864 assert_eq!(i32_col.len(), i32_bss.len());
1865 i32_col
1866 .iter()
1867 .zip(i32_bss.iter())
1868 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1869
1870 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
1872 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
1873 assert_eq!(i64_col.len(), i64_bss.len());
1874 i64_col
1875 .iter()
1876 .zip(i64_bss.iter())
1877 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1878
1879 let flba_col = batch.column(10).as_fixed_size_binary();
1881 let flba_bss = batch.column(11).as_fixed_size_binary();
1882 assert_eq!(flba_col.len(), flba_bss.len());
1883 flba_col
1884 .iter()
1885 .zip(flba_bss.iter())
1886 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1887
1888 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
1890 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
1891 assert_eq!(dec_col.len(), dec_bss.len());
1892 dec_col
1893 .iter()
1894 .zip(dec_bss.iter())
1895 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1896 }
1897 assert_eq!(row_count, 200);
1898 }
1899
1900 #[test]
1901 fn test_read_incorrect_map_schema_file() {
1902 let testdata = arrow::util::test_util::parquet_test_data();
1903 let path = format!("{testdata}/incorrect_map_schema.parquet");
1905 let file = File::open(path).unwrap();
1906 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1907
1908 let batch = record_reader.next().unwrap().unwrap();
1909 assert_eq!(batch.num_rows(), 1);
1910
1911 let expected_schema = Schema::new(Fields::from(vec![Field::new(
1912 "my_map",
1913 ArrowDataType::Map(
1914 Arc::new(Field::new(
1915 "key_value",
1916 ArrowDataType::Struct(Fields::from(vec![
1917 Field::new("key", ArrowDataType::Utf8, false),
1918 Field::new("value", ArrowDataType::Utf8, true),
1919 ])),
1920 false,
1921 )),
1922 false,
1923 ),
1924 true,
1925 )]));
1926 assert_eq!(batch.schema().as_ref(), &expected_schema);
1927
1928 assert_eq!(batch.num_rows(), 1);
1929 assert_eq!(batch.column(0).null_count(), 0);
1930 assert_eq!(
1931 batch.column(0).as_map().keys().as_ref(),
1932 &StringArray::from(vec!["parent", "name"])
1933 );
1934 assert_eq!(
1935 batch.column(0).as_map().values().as_ref(),
1936 &StringArray::from(vec!["another", "report"])
1937 );
1938 }
1939
1940 #[derive(Clone)]
1942 struct TestOptions {
1943 num_row_groups: usize,
1946 num_rows: usize,
1948 record_batch_size: usize,
1950 null_percent: Option<usize>,
1952 write_batch_size: usize,
1957 max_data_page_size: usize,
1959 max_dict_page_size: usize,
1961 writer_version: WriterVersion,
1963 enabled_statistics: EnabledStatistics,
1965 encoding: Encoding,
1967 row_selections: Option<(RowSelection, usize)>,
1969 row_filter: Option<Vec<bool>>,
1971 limit: Option<usize>,
1973 offset: Option<usize>,
1975 }
1976
1977 impl std::fmt::Debug for TestOptions {
1979 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1980 f.debug_struct("TestOptions")
1981 .field("num_row_groups", &self.num_row_groups)
1982 .field("num_rows", &self.num_rows)
1983 .field("record_batch_size", &self.record_batch_size)
1984 .field("null_percent", &self.null_percent)
1985 .field("write_batch_size", &self.write_batch_size)
1986 .field("max_data_page_size", &self.max_data_page_size)
1987 .field("max_dict_page_size", &self.max_dict_page_size)
1988 .field("writer_version", &self.writer_version)
1989 .field("enabled_statistics", &self.enabled_statistics)
1990 .field("encoding", &self.encoding)
1991 .field("row_selections", &self.row_selections.is_some())
1992 .field("row_filter", &self.row_filter.is_some())
1993 .field("limit", &self.limit)
1994 .field("offset", &self.offset)
1995 .finish()
1996 }
1997 }
1998
1999 impl Default for TestOptions {
2000 fn default() -> Self {
2001 Self {
2002 num_row_groups: 2,
2003 num_rows: 100,
2004 record_batch_size: 15,
2005 null_percent: None,
2006 write_batch_size: 64,
2007 max_data_page_size: 1024 * 1024,
2008 max_dict_page_size: 1024 * 1024,
2009 writer_version: WriterVersion::PARQUET_1_0,
2010 enabled_statistics: EnabledStatistics::Page,
2011 encoding: Encoding::PLAIN,
2012 row_selections: None,
2013 row_filter: None,
2014 limit: None,
2015 offset: None,
2016 }
2017 }
2018 }
2019
2020 impl TestOptions {
2021 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2022 Self {
2023 num_row_groups,
2024 num_rows,
2025 record_batch_size,
2026 ..Default::default()
2027 }
2028 }
2029
2030 fn with_null_percent(self, null_percent: usize) -> Self {
2031 Self {
2032 null_percent: Some(null_percent),
2033 ..self
2034 }
2035 }
2036
2037 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2038 Self {
2039 max_data_page_size,
2040 ..self
2041 }
2042 }
2043
2044 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2045 Self {
2046 max_dict_page_size,
2047 ..self
2048 }
2049 }
2050
2051 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2052 Self {
2053 enabled_statistics,
2054 ..self
2055 }
2056 }
2057
2058 fn with_row_selections(self) -> Self {
2059 assert!(self.row_filter.is_none(), "Must set row selection first");
2060
2061 let mut rng = thread_rng();
2062 let step = rng.gen_range(self.record_batch_size..self.num_rows);
2063 let row_selections =
2064 create_test_selection(step, self.num_row_groups * self.num_rows, rng.gen::<bool>());
2065 Self {
2066 row_selections: Some(row_selections),
2067 ..self
2068 }
2069 }
2070
2071 fn with_row_filter(self) -> Self {
2072 let row_count = match &self.row_selections {
2073 Some((_, count)) => *count,
2074 None => self.num_row_groups * self.num_rows,
2075 };
2076
2077 let mut rng = thread_rng();
2078 Self {
2079 row_filter: Some((0..row_count).map(|_| rng.gen_bool(0.9)).collect()),
2080 ..self
2081 }
2082 }
2083
2084 fn with_limit(self, limit: usize) -> Self {
2085 Self {
2086 limit: Some(limit),
2087 ..self
2088 }
2089 }
2090
2091 fn with_offset(self, offset: usize) -> Self {
2092 Self {
2093 offset: Some(offset),
2094 ..self
2095 }
2096 }
2097
2098 fn writer_props(&self) -> WriterProperties {
2099 let builder = WriterProperties::builder()
2100 .set_data_page_size_limit(self.max_data_page_size)
2101 .set_write_batch_size(self.write_batch_size)
2102 .set_writer_version(self.writer_version)
2103 .set_statistics_enabled(self.enabled_statistics);
2104
2105 let builder = match self.encoding {
2106 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2107 .set_dictionary_enabled(true)
2108 .set_dictionary_page_size_limit(self.max_dict_page_size),
2109 _ => builder
2110 .set_dictionary_enabled(false)
2111 .set_encoding(self.encoding),
2112 };
2113
2114 builder.build()
2115 }
2116 }
2117
2118 fn run_single_column_reader_tests<T, F, G>(
2125 rand_max: i32,
2126 converted_type: ConvertedType,
2127 arrow_type: Option<ArrowDataType>,
2128 converter: F,
2129 encodings: &[Encoding],
2130 ) where
2131 T: DataType,
2132 G: RandGen<T>,
2133 F: Fn(&[Option<T::T>]) -> ArrayRef,
2134 {
2135 let all_options = vec![
2136 TestOptions::new(2, 100, 15),
2139 TestOptions::new(3, 25, 5),
2144 TestOptions::new(4, 100, 25),
2148 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2150 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2152 TestOptions::new(2, 256, 127).with_null_percent(0),
2154 TestOptions::new(2, 256, 93).with_null_percent(25),
2156 TestOptions::new(4, 100, 25).with_limit(0),
2158 TestOptions::new(4, 100, 25).with_limit(50),
2160 TestOptions::new(4, 100, 25).with_limit(10),
2162 TestOptions::new(4, 100, 25).with_limit(101),
2164 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2166 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2168 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2170 TestOptions::new(2, 256, 91)
2172 .with_null_percent(25)
2173 .with_enabled_statistics(EnabledStatistics::Chunk),
2174 TestOptions::new(2, 256, 91)
2176 .with_null_percent(25)
2177 .with_enabled_statistics(EnabledStatistics::None),
2178 TestOptions::new(2, 128, 91)
2180 .with_null_percent(100)
2181 .with_enabled_statistics(EnabledStatistics::None),
2182 TestOptions::new(2, 100, 15).with_row_selections(),
2187 TestOptions::new(3, 25, 5).with_row_selections(),
2192 TestOptions::new(4, 100, 25).with_row_selections(),
2196 TestOptions::new(3, 256, 73)
2198 .with_max_data_page_size(128)
2199 .with_row_selections(),
2200 TestOptions::new(3, 256, 57)
2202 .with_max_dict_page_size(128)
2203 .with_row_selections(),
2204 TestOptions::new(2, 256, 127)
2206 .with_null_percent(0)
2207 .with_row_selections(),
2208 TestOptions::new(2, 256, 93)
2210 .with_null_percent(25)
2211 .with_row_selections(),
2212 TestOptions::new(2, 256, 93)
2214 .with_null_percent(25)
2215 .with_row_selections()
2216 .with_limit(10),
2217 TestOptions::new(2, 256, 93)
2219 .with_null_percent(25)
2220 .with_row_selections()
2221 .with_offset(20)
2222 .with_limit(10),
2223 TestOptions::new(4, 100, 25).with_row_filter(),
2227 TestOptions::new(4, 100, 25)
2229 .with_row_selections()
2230 .with_row_filter(),
2231 TestOptions::new(2, 256, 93)
2233 .with_null_percent(25)
2234 .with_max_data_page_size(10)
2235 .with_row_filter(),
2236 TestOptions::new(2, 256, 93)
2238 .with_null_percent(25)
2239 .with_max_data_page_size(10)
2240 .with_row_selections()
2241 .with_row_filter(),
2242 TestOptions::new(2, 256, 93)
2244 .with_enabled_statistics(EnabledStatistics::None)
2245 .with_max_data_page_size(10)
2246 .with_row_selections(),
2247 ];
2248
2249 all_options.into_iter().for_each(|opts| {
2250 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2251 for encoding in encodings {
2252 let opts = TestOptions {
2253 writer_version,
2254 encoding: *encoding,
2255 ..opts.clone()
2256 };
2257
2258 single_column_reader_test::<T, _, G>(
2259 opts,
2260 rand_max,
2261 converted_type,
2262 arrow_type.clone(),
2263 &converter,
2264 )
2265 }
2266 }
2267 });
2268 }
2269
2270 fn single_column_reader_test<T, F, G>(
2274 opts: TestOptions,
2275 rand_max: i32,
2276 converted_type: ConvertedType,
2277 arrow_type: Option<ArrowDataType>,
2278 converter: F,
2279 ) where
2280 T: DataType,
2281 G: RandGen<T>,
2282 F: Fn(&[Option<T::T>]) -> ArrayRef,
2283 {
2284 println!(
2286 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2287 T::get_physical_type(), converted_type, arrow_type, opts
2288 );
2289
2290 let (repetition, def_levels) = match opts.null_percent.as_ref() {
2292 Some(null_percent) => {
2293 let mut rng = thread_rng();
2294
2295 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2296 .map(|_| {
2297 std::iter::from_fn(|| {
2298 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2299 })
2300 .take(opts.num_rows)
2301 .collect()
2302 })
2303 .collect();
2304 (Repetition::OPTIONAL, Some(def_levels))
2305 }
2306 None => (Repetition::REQUIRED, None),
2307 };
2308
2309 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2311 .map(|idx| {
2312 let null_count = match def_levels.as_ref() {
2313 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2314 None => 0,
2315 };
2316 G::gen_vec(rand_max, opts.num_rows - null_count)
2317 })
2318 .collect();
2319
2320 let len = match T::get_physical_type() {
2321 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2322 crate::basic::Type::INT96 => 12,
2323 _ => -1,
2324 };
2325
2326 let fields = vec![Arc::new(
2327 Type::primitive_type_builder("leaf", T::get_physical_type())
2328 .with_repetition(repetition)
2329 .with_converted_type(converted_type)
2330 .with_length(len)
2331 .build()
2332 .unwrap(),
2333 )];
2334
2335 let schema = Arc::new(
2336 Type::group_type_builder("test_schema")
2337 .with_fields(fields)
2338 .build()
2339 .unwrap(),
2340 );
2341
2342 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2343
2344 let mut file = tempfile::tempfile().unwrap();
2345
2346 generate_single_column_file_with_data::<T>(
2347 &values,
2348 def_levels.as_ref(),
2349 file.try_clone().unwrap(), schema,
2351 arrow_field,
2352 &opts,
2353 )
2354 .unwrap();
2355
2356 file.rewind().unwrap();
2357
2358 let options = ArrowReaderOptions::new()
2359 .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2360
2361 let mut builder =
2362 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2363
2364 let expected_data = match opts.row_selections {
2365 Some((selections, row_count)) => {
2366 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2367
2368 let mut skip_data: Vec<Option<T::T>> = vec![];
2369 let dequeue: VecDeque<RowSelector> = selections.clone().into();
2370 for select in dequeue {
2371 if select.skip {
2372 without_skip_data.drain(0..select.row_count);
2373 } else {
2374 skip_data.extend(without_skip_data.drain(0..select.row_count));
2375 }
2376 }
2377 builder = builder.with_row_selection(selections);
2378
2379 assert_eq!(skip_data.len(), row_count);
2380 skip_data
2381 }
2382 None => {
2383 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2385 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2386 expected_data
2387 }
2388 };
2389
2390 let mut expected_data = match opts.row_filter {
2391 Some(filter) => {
2392 let expected_data = expected_data
2393 .into_iter()
2394 .zip(filter.iter())
2395 .filter_map(|(d, f)| f.then(|| d))
2396 .collect();
2397
2398 let mut filter_offset = 0;
2399 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2400 ProjectionMask::all(),
2401 move |b| {
2402 let array = BooleanArray::from_iter(
2403 filter
2404 .iter()
2405 .skip(filter_offset)
2406 .take(b.num_rows())
2407 .map(|x| Some(*x)),
2408 );
2409 filter_offset += b.num_rows();
2410 Ok(array)
2411 },
2412 ))]);
2413
2414 builder = builder.with_row_filter(filter);
2415 expected_data
2416 }
2417 None => expected_data,
2418 };
2419
2420 if let Some(offset) = opts.offset {
2421 builder = builder.with_offset(offset);
2422 expected_data = expected_data.into_iter().skip(offset).collect();
2423 }
2424
2425 if let Some(limit) = opts.limit {
2426 builder = builder.with_limit(limit);
2427 expected_data = expected_data.into_iter().take(limit).collect();
2428 }
2429
2430 let mut record_reader = builder
2431 .with_batch_size(opts.record_batch_size)
2432 .build()
2433 .unwrap();
2434
2435 let mut total_read = 0;
2436 loop {
2437 let maybe_batch = record_reader.next();
2438 if total_read < expected_data.len() {
2439 let end = min(total_read + opts.record_batch_size, expected_data.len());
2440 let batch = maybe_batch.unwrap().unwrap();
2441 assert_eq!(end - total_read, batch.num_rows());
2442
2443 let a = converter(&expected_data[total_read..end]);
2444 let b = Arc::clone(batch.column(0));
2445
2446 assert_eq!(a.data_type(), b.data_type());
2447 assert_eq!(a.to_data(), b.to_data());
2448 assert_eq!(
2449 a.as_any().type_id(),
2450 b.as_any().type_id(),
2451 "incorrect type ids"
2452 );
2453
2454 total_read = end;
2455 } else {
2456 assert!(maybe_batch.is_none());
2457 break;
2458 }
2459 }
2460 }
2461
2462 fn gen_expected_data<T: DataType>(
2463 def_levels: Option<&Vec<Vec<i16>>>,
2464 values: &[Vec<T::T>],
2465 ) -> Vec<Option<T::T>> {
2466 let data: Vec<Option<T::T>> = match def_levels {
2467 Some(levels) => {
2468 let mut values_iter = values.iter().flatten();
2469 levels
2470 .iter()
2471 .flatten()
2472 .map(|d| match d {
2473 1 => Some(values_iter.next().cloned().unwrap()),
2474 0 => None,
2475 _ => unreachable!(),
2476 })
2477 .collect()
2478 }
2479 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2480 };
2481 data
2482 }
2483
2484 fn generate_single_column_file_with_data<T: DataType>(
2485 values: &[Vec<T::T>],
2486 def_levels: Option<&Vec<Vec<i16>>>,
2487 file: File,
2488 schema: TypePtr,
2489 field: Option<Field>,
2490 opts: &TestOptions,
2491 ) -> Result<crate::format::FileMetaData> {
2492 let mut writer_props = opts.writer_props();
2493 if let Some(field) = field {
2494 let arrow_schema = Schema::new(vec![field]);
2495 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2496 }
2497
2498 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2499
2500 for (idx, v) in values.iter().enumerate() {
2501 let def_levels = def_levels.map(|d| d[idx].as_slice());
2502 let mut row_group_writer = writer.next_row_group()?;
2503 {
2504 let mut column_writer = row_group_writer
2505 .next_column()?
2506 .expect("Column writer is none!");
2507
2508 column_writer
2509 .typed::<T>()
2510 .write_batch(v, def_levels, None)?;
2511
2512 column_writer.close()?;
2513 }
2514 row_group_writer.close()?;
2515 }
2516
2517 writer.close()
2518 }
2519
2520 fn get_test_file(file_name: &str) -> File {
2521 let mut path = PathBuf::new();
2522 path.push(arrow::util::test_util::arrow_test_data());
2523 path.push(file_name);
2524
2525 File::open(path.as_path()).expect("File not found!")
2526 }
2527
2528 #[test]
2529 fn test_read_structs() {
2530 let testdata = arrow::util::test_util::parquet_test_data();
2534 let path = format!("{testdata}/nested_structs.rust.parquet");
2535 let file = File::open(&path).unwrap();
2536 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2537
2538 for batch in record_batch_reader {
2539 batch.unwrap();
2540 }
2541
2542 let file = File::open(&path).unwrap();
2543 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2544
2545 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2546 let projected_reader = builder
2547 .with_projection(mask)
2548 .with_batch_size(60)
2549 .build()
2550 .unwrap();
2551
2552 let expected_schema = Schema::new(vec![
2553 Field::new(
2554 "roll_num",
2555 ArrowDataType::Struct(Fields::from(vec![Field::new(
2556 "count",
2557 ArrowDataType::UInt64,
2558 false,
2559 )])),
2560 false,
2561 ),
2562 Field::new(
2563 "PC_CUR",
2564 ArrowDataType::Struct(Fields::from(vec![
2565 Field::new("mean", ArrowDataType::Int64, false),
2566 Field::new("sum", ArrowDataType::Int64, false),
2567 ])),
2568 false,
2569 ),
2570 ]);
2571
2572 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2574
2575 for batch in projected_reader {
2576 let batch = batch.unwrap();
2577 assert_eq!(batch.schema().as_ref(), &expected_schema);
2578 }
2579 }
2580
2581 #[test]
2582 fn test_read_structs_by_name() {
2584 let testdata = arrow::util::test_util::parquet_test_data();
2585 let path = format!("{testdata}/nested_structs.rust.parquet");
2586 let file = File::open(&path).unwrap();
2587 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2588
2589 for batch in record_batch_reader {
2590 batch.unwrap();
2591 }
2592
2593 let file = File::open(&path).unwrap();
2594 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2595
2596 let mask = ProjectionMask::columns(
2597 builder.parquet_schema(),
2598 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
2599 );
2600 let projected_reader = builder
2601 .with_projection(mask)
2602 .with_batch_size(60)
2603 .build()
2604 .unwrap();
2605
2606 let expected_schema = Schema::new(vec![
2607 Field::new(
2608 "roll_num",
2609 ArrowDataType::Struct(Fields::from(vec![Field::new(
2610 "count",
2611 ArrowDataType::UInt64,
2612 false,
2613 )])),
2614 false,
2615 ),
2616 Field::new(
2617 "PC_CUR",
2618 ArrowDataType::Struct(Fields::from(vec![
2619 Field::new("mean", ArrowDataType::Int64, false),
2620 Field::new("sum", ArrowDataType::Int64, false),
2621 ])),
2622 false,
2623 ),
2624 ]);
2625
2626 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2627
2628 for batch in projected_reader {
2629 let batch = batch.unwrap();
2630 assert_eq!(batch.schema().as_ref(), &expected_schema);
2631 }
2632 }
2633
2634 #[test]
2635 fn test_read_maps() {
2636 let testdata = arrow::util::test_util::parquet_test_data();
2637 let path = format!("{testdata}/nested_maps.snappy.parquet");
2638 let file = File::open(path).unwrap();
2639 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2640
2641 for batch in record_batch_reader {
2642 batch.unwrap();
2643 }
2644 }
2645
2646 #[test]
2647 fn test_nested_nullability() {
2648 let message_type = "message nested {
2649 OPTIONAL Group group {
2650 REQUIRED INT32 leaf;
2651 }
2652 }";
2653
2654 let file = tempfile::tempfile().unwrap();
2655 let schema = Arc::new(parse_message_type(message_type).unwrap());
2656
2657 {
2658 let mut writer =
2660 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
2661 .unwrap();
2662
2663 {
2664 let mut row_group_writer = writer.next_row_group().unwrap();
2665 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
2666
2667 column_writer
2668 .typed::<Int32Type>()
2669 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
2670 .unwrap();
2671
2672 column_writer.close().unwrap();
2673 row_group_writer.close().unwrap();
2674 }
2675
2676 writer.close().unwrap();
2677 }
2678
2679 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2680 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
2681
2682 let reader = builder.with_projection(mask).build().unwrap();
2683
2684 let expected_schema = Schema::new(Fields::from(vec![Field::new(
2685 "group",
2686 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
2687 true,
2688 )]));
2689
2690 let batch = reader.into_iter().next().unwrap().unwrap();
2691 assert_eq!(batch.schema().as_ref(), &expected_schema);
2692 assert_eq!(batch.num_rows(), 4);
2693 assert_eq!(batch.column(0).null_count(), 2);
2694 }
2695
2696 #[test]
2697 fn test_invalid_utf8() {
2698 let data = vec![
2700 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
2701 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
2702 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
2703 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
2704 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
2705 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
2706 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
2707 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
2708 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
2709 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
2710 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
2711 82, 49,
2712 ];
2713
2714 let file = Bytes::from(data);
2715 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
2716
2717 let error = record_batch_reader.next().unwrap().unwrap_err();
2718
2719 assert!(
2720 error.to_string().contains("invalid utf-8 sequence"),
2721 "{}",
2722 error
2723 );
2724 }
2725
2726 #[test]
2727 fn test_invalid_utf8_string_array() {
2728 test_invalid_utf8_string_array_inner::<i32>();
2729 }
2730
2731 #[test]
2732 fn test_invalid_utf8_large_string_array() {
2733 test_invalid_utf8_string_array_inner::<i64>();
2734 }
2735
2736 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
2737 let cases = [
2738 invalid_utf8_first_char::<O>(),
2739 invalid_utf8_first_char_long_strings::<O>(),
2740 invalid_utf8_later_char::<O>(),
2741 invalid_utf8_later_char_long_strings::<O>(),
2742 invalid_utf8_later_char_really_long_strings::<O>(),
2743 invalid_utf8_later_char_really_long_strings2::<O>(),
2744 ];
2745 for array in &cases {
2746 for encoding in STRING_ENCODINGS {
2747 let array = unsafe {
2750 GenericStringArray::<O>::new_unchecked(
2751 array.offsets().clone(),
2752 array.values().clone(),
2753 array.nulls().cloned(),
2754 )
2755 };
2756 let data_type = array.data_type().clone();
2757 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
2758 let err = read_from_parquet(data).unwrap_err();
2759 let expected_err =
2760 "Parquet argument error: Parquet error: encountered non UTF-8 data";
2761 assert!(
2762 err.to_string().contains(expected_err),
2763 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
2764 );
2765 }
2766 }
2767 }
2768
2769 #[test]
2770 fn test_invalid_utf8_string_view_array() {
2771 let cases = [
2772 invalid_utf8_first_char::<i32>(),
2773 invalid_utf8_first_char_long_strings::<i32>(),
2774 invalid_utf8_later_char::<i32>(),
2775 invalid_utf8_later_char_long_strings::<i32>(),
2776 invalid_utf8_later_char_really_long_strings::<i32>(),
2777 invalid_utf8_later_char_really_long_strings2::<i32>(),
2778 ];
2779
2780 for encoding in STRING_ENCODINGS {
2781 for array in &cases {
2782 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
2783 let array = array.as_binary_view();
2784
2785 let array = unsafe {
2788 StringViewArray::new_unchecked(
2789 array.views().clone(),
2790 array.data_buffers().to_vec(),
2791 array.nulls().cloned(),
2792 )
2793 };
2794
2795 let data_type = array.data_type().clone();
2796 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
2797 let err = read_from_parquet(data).unwrap_err();
2798 let expected_err =
2799 "Parquet argument error: Parquet error: encountered non UTF-8 data";
2800 assert!(
2801 err.to_string().contains(expected_err),
2802 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
2803 );
2804 }
2805 }
2806 }
2807
2808 const STRING_ENCODINGS: &[Option<Encoding>] = &[
2810 None,
2811 Some(Encoding::PLAIN),
2812 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
2813 Some(Encoding::DELTA_BYTE_ARRAY),
2814 ];
2815
2816 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
2819
2820 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
2823
2824 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2826 let valid: &[u8] = b" ";
2827 let invalid = INVALID_UTF8_FIRST_CHAR;
2828 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
2829 }
2830
2831 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2835 let valid: &[u8] = b" ";
2836 let mut invalid = vec![];
2837 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2838 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2839 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2840 }
2841
2842 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2845 let valid: &[u8] = b" ";
2846 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
2847 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
2848 }
2849
2850 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2854 let valid: &[u8] = b" ";
2855 let mut invalid = vec![];
2856 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2857 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
2858 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2859 }
2860
2861 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2865 let valid: &[u8] = b" ";
2866 let mut invalid = vec![];
2867 for _ in 0..10 {
2868 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2870 }
2871 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
2872 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2873 }
2874
2875 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2878 let valid: &[u8] = b" ";
2879 let mut valid_long = vec![];
2880 for _ in 0..10 {
2881 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2883 }
2884 let invalid = INVALID_UTF8_LATER_CHAR;
2885 GenericBinaryArray::<O>::from_iter(vec![
2886 None,
2887 Some(valid),
2888 Some(invalid),
2889 None,
2890 Some(&valid_long),
2891 Some(valid),
2892 ])
2893 }
2894
2895 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
2900 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
2901 let mut data = vec![];
2902 let schema = batch.schema();
2903 let props = encoding.map(|encoding| {
2904 WriterProperties::builder()
2905 .set_dictionary_enabled(false)
2907 .set_encoding(encoding)
2908 .build()
2909 });
2910
2911 {
2912 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
2913 writer.write(&batch).unwrap();
2914 writer.flush().unwrap();
2915 writer.close().unwrap();
2916 };
2917 data
2918 }
2919
2920 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
2922 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
2923 .unwrap()
2924 .build()
2925 .unwrap();
2926
2927 reader.collect()
2928 }
2929
2930 #[test]
2931 fn test_dictionary_preservation() {
2932 let fields = vec![Arc::new(
2933 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
2934 .with_repetition(Repetition::OPTIONAL)
2935 .with_converted_type(ConvertedType::UTF8)
2936 .build()
2937 .unwrap(),
2938 )];
2939
2940 let schema = Arc::new(
2941 Type::group_type_builder("test_schema")
2942 .with_fields(fields)
2943 .build()
2944 .unwrap(),
2945 );
2946
2947 let dict_type = ArrowDataType::Dictionary(
2948 Box::new(ArrowDataType::Int32),
2949 Box::new(ArrowDataType::Utf8),
2950 );
2951
2952 let arrow_field = Field::new("leaf", dict_type, true);
2953
2954 let mut file = tempfile::tempfile().unwrap();
2955
2956 let values = vec![
2957 vec![
2958 ByteArray::from("hello"),
2959 ByteArray::from("a"),
2960 ByteArray::from("b"),
2961 ByteArray::from("d"),
2962 ],
2963 vec![
2964 ByteArray::from("c"),
2965 ByteArray::from("a"),
2966 ByteArray::from("b"),
2967 ],
2968 ];
2969
2970 let def_levels = vec![
2971 vec![1, 0, 0, 1, 0, 0, 1, 1],
2972 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
2973 ];
2974
2975 let opts = TestOptions {
2976 encoding: Encoding::RLE_DICTIONARY,
2977 ..Default::default()
2978 };
2979
2980 generate_single_column_file_with_data::<ByteArrayType>(
2981 &values,
2982 Some(&def_levels),
2983 file.try_clone().unwrap(), schema,
2985 Some(arrow_field),
2986 &opts,
2987 )
2988 .unwrap();
2989
2990 file.rewind().unwrap();
2991
2992 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
2993
2994 let batches = record_reader
2995 .collect::<Result<Vec<RecordBatch>, _>>()
2996 .unwrap();
2997
2998 assert_eq!(batches.len(), 6);
2999 assert!(batches.iter().all(|x| x.num_columns() == 1));
3000
3001 let row_counts = batches
3002 .iter()
3003 .map(|x| (x.num_rows(), x.column(0).null_count()))
3004 .collect::<Vec<_>>();
3005
3006 assert_eq!(
3007 row_counts,
3008 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3009 );
3010
3011 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3012
3013 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3015 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3017 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3018 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3020 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3021 }
3022
3023 #[test]
3024 fn test_read_null_list() {
3025 let testdata = arrow::util::test_util::parquet_test_data();
3026 let path = format!("{testdata}/null_list.parquet");
3027 let file = File::open(path).unwrap();
3028 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3029
3030 let batch = record_batch_reader.next().unwrap().unwrap();
3031 assert_eq!(batch.num_rows(), 1);
3032 assert_eq!(batch.num_columns(), 1);
3033 assert_eq!(batch.column(0).len(), 1);
3034
3035 let list = batch
3036 .column(0)
3037 .as_any()
3038 .downcast_ref::<ListArray>()
3039 .unwrap();
3040 assert_eq!(list.len(), 1);
3041 assert!(list.is_valid(0));
3042
3043 let val = list.value(0);
3044 assert_eq!(val.len(), 0);
3045 }
3046
3047 #[test]
3048 fn test_null_schema_inference() {
3049 let testdata = arrow::util::test_util::parquet_test_data();
3050 let path = format!("{testdata}/null_list.parquet");
3051 let file = File::open(path).unwrap();
3052
3053 let arrow_field = Field::new(
3054 "emptylist",
3055 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3056 true,
3057 );
3058
3059 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3060 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3061 let schema = builder.schema();
3062 assert_eq!(schema.fields().len(), 1);
3063 assert_eq!(schema.field(0), &arrow_field);
3064 }
3065
3066 #[test]
3067 fn test_skip_metadata() {
3068 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3069 let field = Field::new("col", col.data_type().clone(), true);
3070
3071 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3072
3073 let metadata = [("key".to_string(), "value".to_string())]
3074 .into_iter()
3075 .collect();
3076
3077 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3078
3079 assert_ne!(schema_with_metadata, schema_without_metadata);
3080
3081 let batch =
3082 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3083
3084 let file = |version: WriterVersion| {
3085 let props = WriterProperties::builder()
3086 .set_writer_version(version)
3087 .build();
3088
3089 let file = tempfile().unwrap();
3090 let mut writer =
3091 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3092 .unwrap();
3093 writer.write(&batch).unwrap();
3094 writer.close().unwrap();
3095 file
3096 };
3097
3098 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3099
3100 let v1_reader = file(WriterVersion::PARQUET_1_0);
3101 let v2_reader = file(WriterVersion::PARQUET_2_0);
3102
3103 let arrow_reader =
3104 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3105 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3106
3107 let reader =
3108 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3109 .unwrap()
3110 .build()
3111 .unwrap();
3112 assert_eq!(reader.schema(), schema_without_metadata);
3113
3114 let arrow_reader =
3115 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3116 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3117
3118 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3119 .unwrap()
3120 .build()
3121 .unwrap();
3122 assert_eq!(reader.schema(), schema_without_metadata);
3123 }
3124
3125 fn write_parquet_from_iter<I, F>(value: I) -> File
3126 where
3127 I: IntoIterator<Item = (F, ArrayRef)>,
3128 F: AsRef<str>,
3129 {
3130 let batch = RecordBatch::try_from_iter(value).unwrap();
3131 let file = tempfile().unwrap();
3132 let mut writer =
3133 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3134 writer.write(&batch).unwrap();
3135 writer.close().unwrap();
3136 file
3137 }
3138
3139 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3140 where
3141 I: IntoIterator<Item = (F, ArrayRef)>,
3142 F: AsRef<str>,
3143 {
3144 let file = write_parquet_from_iter(value);
3145 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3146 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3147 file.try_clone().unwrap(),
3148 options_with_schema,
3149 );
3150 assert_eq!(builder.err().unwrap().to_string(), expected_error);
3151 }
3152
3153 #[test]
3154 fn test_schema_too_few_columns() {
3155 run_schema_test_with_error(
3156 vec![
3157 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3158 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3159 ],
3160 Arc::new(Schema::new(vec![Field::new(
3161 "int64",
3162 ArrowDataType::Int64,
3163 false,
3164 )])),
3165 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3166 );
3167 }
3168
3169 #[test]
3170 fn test_schema_too_many_columns() {
3171 run_schema_test_with_error(
3172 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3173 Arc::new(Schema::new(vec![
3174 Field::new("int64", ArrowDataType::Int64, false),
3175 Field::new("int32", ArrowDataType::Int32, false),
3176 ])),
3177 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3178 );
3179 }
3180
3181 #[test]
3182 fn test_schema_mismatched_column_names() {
3183 run_schema_test_with_error(
3184 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3185 Arc::new(Schema::new(vec![Field::new(
3186 "other",
3187 ArrowDataType::Int64,
3188 false,
3189 )])),
3190 "Arrow: incompatible arrow schema, expected field named int64 got other",
3191 );
3192 }
3193
3194 #[test]
3195 fn test_schema_incompatible_columns() {
3196 run_schema_test_with_error(
3197 vec![
3198 (
3199 "col1_invalid",
3200 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3201 ),
3202 (
3203 "col2_valid",
3204 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3205 ),
3206 (
3207 "col3_invalid",
3208 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3209 ),
3210 ],
3211 Arc::new(Schema::new(vec![
3212 Field::new("col1_invalid", ArrowDataType::Int32, false),
3213 Field::new("col2_valid", ArrowDataType::Int32, false),
3214 Field::new("col3_invalid", ArrowDataType::Int32, false),
3215 ])),
3216 "Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]",
3217 );
3218 }
3219
3220 #[test]
3221 fn test_one_incompatible_nested_column() {
3222 let nested_fields = Fields::from(vec![
3223 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3224 Field::new("nested1_invalid", ArrowDataType::Int64, false),
3225 ]);
3226 let nested = StructArray::try_new(
3227 nested_fields,
3228 vec![
3229 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3230 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3231 ],
3232 None,
3233 )
3234 .expect("struct array");
3235 let supplied_nested_fields = Fields::from(vec![
3236 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3237 Field::new("nested1_invalid", ArrowDataType::Int32, false),
3238 ]);
3239 run_schema_test_with_error(
3240 vec![
3241 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3242 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3243 ("nested", Arc::new(nested) as ArrayRef),
3244 ],
3245 Arc::new(Schema::new(vec![
3246 Field::new("col1", ArrowDataType::Int64, false),
3247 Field::new("col2", ArrowDataType::Int32, false),
3248 Field::new(
3249 "nested",
3250 ArrowDataType::Struct(supplied_nested_fields),
3251 false,
3252 ),
3253 ])),
3254 "Arrow: incompatible arrow schema, the following fields could not be cast: [nested]",
3255 );
3256 }
3257
3258 #[test]
3259 fn test_read_binary_as_utf8() {
3260 let file = write_parquet_from_iter(vec![
3261 (
3262 "binary_to_utf8",
3263 Arc::new(BinaryArray::from(vec![
3264 b"one".as_ref(),
3265 b"two".as_ref(),
3266 b"three".as_ref(),
3267 ])) as ArrayRef,
3268 ),
3269 (
3270 "large_binary_to_large_utf8",
3271 Arc::new(LargeBinaryArray::from(vec![
3272 b"one".as_ref(),
3273 b"two".as_ref(),
3274 b"three".as_ref(),
3275 ])) as ArrayRef,
3276 ),
3277 (
3278 "binary_view_to_utf8_view",
3279 Arc::new(BinaryViewArray::from(vec![
3280 b"one".as_ref(),
3281 b"two".as_ref(),
3282 b"three".as_ref(),
3283 ])) as ArrayRef,
3284 ),
3285 ]);
3286 let supplied_fields = Fields::from(vec![
3287 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3288 Field::new(
3289 "large_binary_to_large_utf8",
3290 ArrowDataType::LargeUtf8,
3291 false,
3292 ),
3293 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3294 ]);
3295
3296 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3297 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3298 file.try_clone().unwrap(),
3299 options,
3300 )
3301 .expect("reader builder with schema")
3302 .build()
3303 .expect("reader with schema");
3304
3305 let batch = arrow_reader.next().unwrap().unwrap();
3306 assert_eq!(batch.num_columns(), 3);
3307 assert_eq!(batch.num_rows(), 3);
3308 assert_eq!(
3309 batch
3310 .column(0)
3311 .as_string::<i32>()
3312 .iter()
3313 .collect::<Vec<_>>(),
3314 vec![Some("one"), Some("two"), Some("three")]
3315 );
3316
3317 assert_eq!(
3318 batch
3319 .column(1)
3320 .as_string::<i64>()
3321 .iter()
3322 .collect::<Vec<_>>(),
3323 vec![Some("one"), Some("two"), Some("three")]
3324 );
3325
3326 assert_eq!(
3327 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3328 vec![Some("one"), Some("two"), Some("three")]
3329 );
3330 }
3331
3332 #[test]
3333 #[should_panic(expected = "Invalid UTF8 sequence at")]
3334 fn test_read_non_utf8_binary_as_utf8() {
3335 let file = write_parquet_from_iter(vec![(
3336 "non_utf8_binary",
3337 Arc::new(BinaryArray::from(vec![
3338 b"\xDE\x00\xFF".as_ref(),
3339 b"\xDE\x01\xAA".as_ref(),
3340 b"\xDE\x02\xFF".as_ref(),
3341 ])) as ArrayRef,
3342 )]);
3343 let supplied_fields = Fields::from(vec![Field::new(
3344 "non_utf8_binary",
3345 ArrowDataType::Utf8,
3346 false,
3347 )]);
3348
3349 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3350 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3351 file.try_clone().unwrap(),
3352 options,
3353 )
3354 .expect("reader builder with schema")
3355 .build()
3356 .expect("reader with schema");
3357 arrow_reader.next().unwrap().unwrap_err();
3358 }
3359
3360 #[test]
3361 fn test_with_schema() {
3362 let nested_fields = Fields::from(vec![
3363 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3364 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3365 ]);
3366
3367 let nested_arrays: Vec<ArrayRef> = vec![
3368 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3369 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3370 ];
3371
3372 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3373
3374 let file = write_parquet_from_iter(vec![
3375 (
3376 "int32_to_ts_second",
3377 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3378 ),
3379 (
3380 "date32_to_date64",
3381 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3382 ),
3383 ("nested", Arc::new(nested) as ArrayRef),
3384 ]);
3385
3386 let supplied_nested_fields = Fields::from(vec![
3387 Field::new(
3388 "utf8_to_dict",
3389 ArrowDataType::Dictionary(
3390 Box::new(ArrowDataType::Int32),
3391 Box::new(ArrowDataType::Utf8),
3392 ),
3393 false,
3394 ),
3395 Field::new(
3396 "int64_to_ts_nano",
3397 ArrowDataType::Timestamp(
3398 arrow::datatypes::TimeUnit::Nanosecond,
3399 Some("+10:00".into()),
3400 ),
3401 false,
3402 ),
3403 ]);
3404
3405 let supplied_schema = Arc::new(Schema::new(vec![
3406 Field::new(
3407 "int32_to_ts_second",
3408 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3409 false,
3410 ),
3411 Field::new("date32_to_date64", ArrowDataType::Date64, false),
3412 Field::new(
3413 "nested",
3414 ArrowDataType::Struct(supplied_nested_fields),
3415 false,
3416 ),
3417 ]));
3418
3419 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3420 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3421 file.try_clone().unwrap(),
3422 options,
3423 )
3424 .expect("reader builder with schema")
3425 .build()
3426 .expect("reader with schema");
3427
3428 assert_eq!(arrow_reader.schema(), supplied_schema);
3429 let batch = arrow_reader.next().unwrap().unwrap();
3430 assert_eq!(batch.num_columns(), 3);
3431 assert_eq!(batch.num_rows(), 4);
3432 assert_eq!(
3433 batch
3434 .column(0)
3435 .as_any()
3436 .downcast_ref::<TimestampSecondArray>()
3437 .expect("downcast to timestamp second")
3438 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3439 .map(|v| v.to_string())
3440 .expect("value as datetime"),
3441 "1970-01-01 01:00:00 +01:00"
3442 );
3443 assert_eq!(
3444 batch
3445 .column(1)
3446 .as_any()
3447 .downcast_ref::<Date64Array>()
3448 .expect("downcast to date64")
3449 .value_as_date(0)
3450 .map(|v| v.to_string())
3451 .expect("value as date"),
3452 "1970-01-01"
3453 );
3454
3455 let nested = batch
3456 .column(2)
3457 .as_any()
3458 .downcast_ref::<StructArray>()
3459 .expect("downcast to struct");
3460
3461 let nested_dict = nested
3462 .column(0)
3463 .as_any()
3464 .downcast_ref::<Int32DictionaryArray>()
3465 .expect("downcast to dictionary");
3466
3467 assert_eq!(
3468 nested_dict
3469 .values()
3470 .as_any()
3471 .downcast_ref::<StringArray>()
3472 .expect("downcast to string")
3473 .iter()
3474 .collect::<Vec<_>>(),
3475 vec![Some("a"), Some("b")]
3476 );
3477
3478 assert_eq!(
3479 nested_dict.keys().iter().collect::<Vec<_>>(),
3480 vec![Some(0), Some(0), Some(0), Some(1)]
3481 );
3482
3483 assert_eq!(
3484 nested
3485 .column(1)
3486 .as_any()
3487 .downcast_ref::<TimestampNanosecondArray>()
3488 .expect("downcast to timestamp nanosecond")
3489 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3490 .map(|v| v.to_string())
3491 .expect("value as datetime"),
3492 "1970-01-01 10:00:00.000000001 +10:00"
3493 );
3494 }
3495
3496 #[test]
3497 fn test_empty_projection() {
3498 let testdata = arrow::util::test_util::parquet_test_data();
3499 let path = format!("{testdata}/alltypes_plain.parquet");
3500 let file = File::open(path).unwrap();
3501
3502 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3503 let file_metadata = builder.metadata().file_metadata();
3504 let expected_rows = file_metadata.num_rows() as usize;
3505
3506 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3507 let batch_reader = builder
3508 .with_projection(mask)
3509 .with_batch_size(2)
3510 .build()
3511 .unwrap();
3512
3513 let mut total_rows = 0;
3514 for maybe_batch in batch_reader {
3515 let batch = maybe_batch.unwrap();
3516 total_rows += batch.num_rows();
3517 assert_eq!(batch.num_columns(), 0);
3518 assert!(batch.num_rows() <= 2);
3519 }
3520
3521 assert_eq!(total_rows, expected_rows);
3522 }
3523
3524 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3525 let schema = Arc::new(Schema::new(vec![Field::new(
3526 "list",
3527 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
3528 true,
3529 )]));
3530
3531 let mut buf = Vec::with_capacity(1024);
3532
3533 let mut writer = ArrowWriter::try_new(
3534 &mut buf,
3535 schema.clone(),
3536 Some(
3537 WriterProperties::builder()
3538 .set_max_row_group_size(row_group_size)
3539 .build(),
3540 ),
3541 )
3542 .unwrap();
3543 for _ in 0..2 {
3544 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
3545 for _ in 0..(batch_size) {
3546 list_builder.append(true);
3547 }
3548 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
3549 .unwrap();
3550 writer.write(&batch).unwrap();
3551 }
3552 writer.close().unwrap();
3553
3554 let mut record_reader =
3555 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
3556 assert_eq!(
3557 batch_size,
3558 record_reader.next().unwrap().unwrap().num_rows()
3559 );
3560 assert_eq!(
3561 batch_size,
3562 record_reader.next().unwrap().unwrap().num_rows()
3563 );
3564 }
3565
3566 #[test]
3567 fn test_row_group_exact_multiple() {
3568 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
3569 test_row_group_batch(8, 8);
3570 test_row_group_batch(10, 8);
3571 test_row_group_batch(8, 10);
3572 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
3573 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
3574 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
3575 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
3576 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
3577 }
3578
3579 fn get_expected_batches(
3582 column: &RecordBatch,
3583 selection: &RowSelection,
3584 batch_size: usize,
3585 ) -> Vec<RecordBatch> {
3586 let mut expected_batches = vec![];
3587
3588 let mut selection: VecDeque<_> = selection.clone().into();
3589 let mut row_offset = 0;
3590 let mut last_start = None;
3591 while row_offset < column.num_rows() && !selection.is_empty() {
3592 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
3593 while batch_remaining > 0 && !selection.is_empty() {
3594 let (to_read, skip) = match selection.front_mut() {
3595 Some(selection) if selection.row_count > batch_remaining => {
3596 selection.row_count -= batch_remaining;
3597 (batch_remaining, selection.skip)
3598 }
3599 Some(_) => {
3600 let select = selection.pop_front().unwrap();
3601 (select.row_count, select.skip)
3602 }
3603 None => break,
3604 };
3605
3606 batch_remaining -= to_read;
3607
3608 match skip {
3609 true => {
3610 if let Some(last_start) = last_start.take() {
3611 expected_batches.push(column.slice(last_start, row_offset - last_start))
3612 }
3613 row_offset += to_read
3614 }
3615 false => {
3616 last_start.get_or_insert(row_offset);
3617 row_offset += to_read
3618 }
3619 }
3620 }
3621 }
3622
3623 if let Some(last_start) = last_start.take() {
3624 expected_batches.push(column.slice(last_start, row_offset - last_start))
3625 }
3626
3627 for batch in &expected_batches[..expected_batches.len() - 1] {
3629 assert_eq!(batch.num_rows(), batch_size);
3630 }
3631
3632 expected_batches
3633 }
3634
3635 fn create_test_selection(
3636 step_len: usize,
3637 total_len: usize,
3638 skip_first: bool,
3639 ) -> (RowSelection, usize) {
3640 let mut remaining = total_len;
3641 let mut skip = skip_first;
3642 let mut vec = vec![];
3643 let mut selected_count = 0;
3644 while remaining != 0 {
3645 let step = if remaining > step_len {
3646 step_len
3647 } else {
3648 remaining
3649 };
3650 vec.push(RowSelector {
3651 row_count: step,
3652 skip,
3653 });
3654 remaining -= step;
3655 if !skip {
3656 selected_count += step;
3657 }
3658 skip = !skip;
3659 }
3660 (vec.into(), selected_count)
3661 }
3662
3663 #[test]
3664 fn test_scan_row_with_selection() {
3665 let testdata = arrow::util::test_util::parquet_test_data();
3666 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
3667 let test_file = File::open(&path).unwrap();
3668
3669 let mut serial_reader =
3670 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
3671 let data = serial_reader.next().unwrap().unwrap();
3672
3673 let do_test = |batch_size: usize, selection_len: usize| {
3674 for skip_first in [false, true] {
3675 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
3676
3677 let expected = get_expected_batches(&data, &selections, batch_size);
3678 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
3679 assert_eq!(
3680 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
3681 expected,
3682 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
3683 );
3684 }
3685 };
3686
3687 do_test(1000, 1000);
3690
3691 do_test(20, 20);
3693
3694 do_test(20, 5);
3696
3697 do_test(20, 5);
3700
3701 fn create_skip_reader(
3702 test_file: &File,
3703 batch_size: usize,
3704 selections: RowSelection,
3705 ) -> ParquetRecordBatchReader {
3706 let options = ArrowReaderOptions::new().with_page_index(true);
3707 let file = test_file.try_clone().unwrap();
3708 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
3709 .unwrap()
3710 .with_batch_size(batch_size)
3711 .with_row_selection(selections)
3712 .build()
3713 .unwrap()
3714 }
3715 }
3716
3717 #[test]
3718 fn test_batch_size_overallocate() {
3719 let testdata = arrow::util::test_util::parquet_test_data();
3720 let path = format!("{testdata}/alltypes_plain.parquet");
3722 let test_file = File::open(path).unwrap();
3723
3724 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
3725 let num_rows = builder.metadata.file_metadata().num_rows();
3726 let reader = builder
3727 .with_batch_size(1024)
3728 .with_projection(ProjectionMask::all())
3729 .build()
3730 .unwrap();
3731 assert_ne!(1024, num_rows);
3732 assert_eq!(reader.batch_size, num_rows as usize);
3733 }
3734
3735 #[test]
3736 fn test_read_with_page_index_enabled() {
3737 let testdata = arrow::util::test_util::parquet_test_data();
3738
3739 {
3740 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
3742 let test_file = File::open(path).unwrap();
3743 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3744 test_file,
3745 ArrowReaderOptions::new().with_page_index(true),
3746 )
3747 .unwrap();
3748 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
3749 let reader = builder.build().unwrap();
3750 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3751 assert_eq!(batches.len(), 8);
3752 }
3753
3754 {
3755 let path = format!("{testdata}/alltypes_plain.parquet");
3757 let test_file = File::open(path).unwrap();
3758 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3759 test_file,
3760 ArrowReaderOptions::new().with_page_index(true),
3761 )
3762 .unwrap();
3763 assert!(builder.metadata().offset_index().is_none());
3766 let reader = builder.build().unwrap();
3767 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3768 assert_eq!(batches.len(), 1);
3769 }
3770 }
3771
3772 #[test]
3773 fn test_raw_repetition() {
3774 const MESSAGE_TYPE: &str = "
3775 message Log {
3776 OPTIONAL INT32 eventType;
3777 REPEATED INT32 category;
3778 REPEATED group filter {
3779 OPTIONAL INT32 error;
3780 }
3781 }
3782 ";
3783 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
3784 let props = Default::default();
3785
3786 let mut buf = Vec::with_capacity(1024);
3787 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
3788 let mut row_group_writer = writer.next_row_group().unwrap();
3789
3790 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3792 col_writer
3793 .typed::<Int32Type>()
3794 .write_batch(&[1], Some(&[1]), None)
3795 .unwrap();
3796 col_writer.close().unwrap();
3797 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3799 col_writer
3800 .typed::<Int32Type>()
3801 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
3802 .unwrap();
3803 col_writer.close().unwrap();
3804 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3806 col_writer
3807 .typed::<Int32Type>()
3808 .write_batch(&[1], Some(&[1]), Some(&[0]))
3809 .unwrap();
3810 col_writer.close().unwrap();
3811
3812 let rg_md = row_group_writer.close().unwrap();
3813 assert_eq!(rg_md.num_rows(), 1);
3814 writer.close().unwrap();
3815
3816 let bytes = Bytes::from(buf);
3817
3818 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
3819 let full = no_mask.next().unwrap().unwrap();
3820
3821 assert_eq!(full.num_columns(), 3);
3822
3823 for idx in 0..3 {
3824 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
3825 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
3826 let mut reader = b.with_projection(mask).build().unwrap();
3827 let projected = reader.next().unwrap().unwrap();
3828
3829 assert_eq!(projected.num_columns(), 1);
3830 assert_eq!(full.column(idx), projected.column(0));
3831 }
3832 }
3833
3834 #[test]
3835 fn test_read_lz4_raw() {
3836 let testdata = arrow::util::test_util::parquet_test_data();
3837 let path = format!("{testdata}/lz4_raw_compressed.parquet");
3838 let file = File::open(path).unwrap();
3839
3840 let batches = ParquetRecordBatchReader::try_new(file, 1024)
3841 .unwrap()
3842 .collect::<Result<Vec<_>, _>>()
3843 .unwrap();
3844 assert_eq!(batches.len(), 1);
3845 let batch = &batches[0];
3846
3847 assert_eq!(batch.num_columns(), 3);
3848 assert_eq!(batch.num_rows(), 4);
3849
3850 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
3852 assert_eq!(
3853 a.values(),
3854 &[1593604800, 1593604800, 1593604801, 1593604801]
3855 );
3856
3857 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
3858 let a: Vec<_> = a.iter().flatten().collect();
3859 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
3860
3861 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
3862 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
3863 }
3864
3865 #[test]
3875 fn test_read_lz4_hadoop_fallback() {
3876 for file in [
3877 "hadoop_lz4_compressed.parquet",
3878 "non_hadoop_lz4_compressed.parquet",
3879 ] {
3880 let testdata = arrow::util::test_util::parquet_test_data();
3881 let path = format!("{testdata}/{file}");
3882 let file = File::open(path).unwrap();
3883 let expected_rows = 4;
3884
3885 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
3886 .unwrap()
3887 .collect::<Result<Vec<_>, _>>()
3888 .unwrap();
3889 assert_eq!(batches.len(), 1);
3890 let batch = &batches[0];
3891
3892 assert_eq!(batch.num_columns(), 3);
3893 assert_eq!(batch.num_rows(), expected_rows);
3894
3895 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
3896 assert_eq!(
3897 a.values(),
3898 &[1593604800, 1593604800, 1593604801, 1593604801]
3899 );
3900
3901 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
3902 let b: Vec<_> = b.iter().flatten().collect();
3903 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
3904
3905 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
3906 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
3907 }
3908 }
3909
3910 #[test]
3911 fn test_read_lz4_hadoop_large() {
3912 let testdata = arrow::util::test_util::parquet_test_data();
3913 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
3914 let file = File::open(path).unwrap();
3915 let expected_rows = 10000;
3916
3917 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
3918 .unwrap()
3919 .collect::<Result<Vec<_>, _>>()
3920 .unwrap();
3921 assert_eq!(batches.len(), 1);
3922 let batch = &batches[0];
3923
3924 assert_eq!(batch.num_columns(), 1);
3925 assert_eq!(batch.num_rows(), expected_rows);
3926
3927 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
3928 let a: Vec<_> = a.iter().flatten().collect();
3929 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
3930 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
3931 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
3932 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
3933 }
3934
3935 #[test]
3936 #[cfg(feature = "snap")]
3937 fn test_read_nested_lists() {
3938 let testdata = arrow::util::test_util::parquet_test_data();
3939 let path = format!("{testdata}/nested_lists.snappy.parquet");
3940 let file = File::open(path).unwrap();
3941
3942 let f = file.try_clone().unwrap();
3943 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
3944 let expected = reader.next().unwrap().unwrap();
3945 assert_eq!(expected.num_rows(), 3);
3946
3947 let selection = RowSelection::from(vec![
3948 RowSelector::skip(1),
3949 RowSelector::select(1),
3950 RowSelector::skip(1),
3951 ]);
3952 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
3953 .unwrap()
3954 .with_row_selection(selection)
3955 .build()
3956 .unwrap();
3957
3958 let actual = reader.next().unwrap().unwrap();
3959 assert_eq!(actual.num_rows(), 1);
3960 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
3961 }
3962
3963 #[test]
3964 fn test_arbitrary_decimal() {
3965 let values = [1, 2, 3, 4, 5, 6, 7, 8];
3966 let decimals_19_0 = Decimal128Array::from_iter_values(values)
3967 .with_precision_and_scale(19, 0)
3968 .unwrap();
3969 let decimals_12_0 = Decimal128Array::from_iter_values(values)
3970 .with_precision_and_scale(12, 0)
3971 .unwrap();
3972 let decimals_17_10 = Decimal128Array::from_iter_values(values)
3973 .with_precision_and_scale(17, 10)
3974 .unwrap();
3975
3976 let written = RecordBatch::try_from_iter([
3977 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
3978 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
3979 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
3980 ])
3981 .unwrap();
3982
3983 let mut buffer = Vec::with_capacity(1024);
3984 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
3985 writer.write(&written).unwrap();
3986 writer.close().unwrap();
3987
3988 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
3989 .unwrap()
3990 .collect::<Result<Vec<_>, _>>()
3991 .unwrap();
3992
3993 assert_eq!(&written.slice(0, 8), &read[0]);
3994 }
3995
3996 #[test]
3997 fn test_list_skip() {
3998 let mut list = ListBuilder::new(Int32Builder::new());
3999 list.append_value([Some(1), Some(2)]);
4000 list.append_value([Some(3)]);
4001 list.append_value([Some(4)]);
4002 let list = list.finish();
4003 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4004
4005 let props = WriterProperties::builder()
4007 .set_data_page_row_count_limit(1)
4008 .set_write_batch_size(2)
4009 .build();
4010
4011 let mut buffer = Vec::with_capacity(1024);
4012 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4013 writer.write(&batch).unwrap();
4014 writer.close().unwrap();
4015
4016 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4017 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4018 .unwrap()
4019 .with_row_selection(selection.into())
4020 .build()
4021 .unwrap();
4022 let out = reader.next().unwrap().unwrap();
4023 assert_eq!(out.num_rows(), 1);
4024 assert_eq!(out, batch.slice(2, 1));
4025 }
4026
4027 fn test_decimal_roundtrip<T: DecimalType>() {
4028 let d = |values: Vec<usize>, p: u8| {
4033 let iter = values.into_iter().map(T::Native::usize_as);
4034 PrimitiveArray::<T>::from_iter_values(iter)
4035 .with_precision_and_scale(p, 2)
4036 .unwrap()
4037 };
4038
4039 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4040 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4041 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4042 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4043
4044 let batch = RecordBatch::try_from_iter([
4045 ("d1", Arc::new(d1) as ArrayRef),
4046 ("d2", Arc::new(d2) as ArrayRef),
4047 ("d3", Arc::new(d3) as ArrayRef),
4048 ("d4", Arc::new(d4) as ArrayRef),
4049 ])
4050 .unwrap();
4051
4052 let mut buffer = Vec::with_capacity(1024);
4053 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4054 writer.write(&batch).unwrap();
4055 writer.close().unwrap();
4056
4057 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4058 let t1 = builder.parquet_schema().columns()[0].physical_type();
4059 assert_eq!(t1, PhysicalType::INT32);
4060 let t2 = builder.parquet_schema().columns()[1].physical_type();
4061 assert_eq!(t2, PhysicalType::INT64);
4062 let t3 = builder.parquet_schema().columns()[2].physical_type();
4063 assert_eq!(t3, PhysicalType::INT64);
4064 let t4 = builder.parquet_schema().columns()[3].physical_type();
4065 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4066
4067 let mut reader = builder.build().unwrap();
4068 assert_eq!(batch.schema(), reader.schema());
4069
4070 let out = reader.next().unwrap().unwrap();
4071 assert_eq!(batch, out);
4072 }
4073
4074 #[test]
4075 fn test_decimal() {
4076 test_decimal_roundtrip::<Decimal128Type>();
4077 test_decimal_roundtrip::<Decimal256Type>();
4078 }
4079
4080 #[test]
4081 fn test_list_selection() {
4082 let schema = Arc::new(Schema::new(vec![Field::new_list(
4083 "list",
4084 Field::new_list_field(ArrowDataType::Utf8, true),
4085 false,
4086 )]));
4087 let mut buf = Vec::with_capacity(1024);
4088
4089 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4090
4091 for i in 0..2 {
4092 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4093 for j in 0..1024 {
4094 list_a_builder.values().append_value(format!("{i} {j}"));
4095 list_a_builder.append(true);
4096 }
4097 let batch =
4098 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4099 .unwrap();
4100 writer.write(&batch).unwrap();
4101 }
4102 let _metadata = writer.close().unwrap();
4103
4104 let buf = Bytes::from(buf);
4105 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4106 .unwrap()
4107 .with_row_selection(RowSelection::from(vec![
4108 RowSelector::skip(100),
4109 RowSelector::select(924),
4110 RowSelector::skip(100),
4111 RowSelector::select(924),
4112 ]))
4113 .build()
4114 .unwrap();
4115
4116 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4117 let batch = concat_batches(&schema, &batches).unwrap();
4118
4119 assert_eq!(batch.num_rows(), 924 * 2);
4120 let list = batch.column(0).as_list::<i32>();
4121
4122 for w in list.value_offsets().windows(2) {
4123 assert_eq!(w[0] + 1, w[1])
4124 }
4125 let mut values = list.values().as_string::<i32>().iter();
4126
4127 for i in 0..2 {
4128 for j in 100..1024 {
4129 let expected = format!("{i} {j}");
4130 assert_eq!(values.next().unwrap().unwrap(), &expected);
4131 }
4132 }
4133 }
4134
4135 #[test]
4136 fn test_list_selection_fuzz() {
4137 let mut rng = thread_rng();
4138 let schema = Arc::new(Schema::new(vec![Field::new_list(
4139 "list",
4140 Field::new_list(
4141 Field::LIST_FIELD_DEFAULT_NAME,
4142 Field::new_list_field(ArrowDataType::Int32, true),
4143 true,
4144 ),
4145 true,
4146 )]));
4147 let mut buf = Vec::with_capacity(1024);
4148 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4149
4150 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4151
4152 for _ in 0..2048 {
4153 if rng.gen_bool(0.2) {
4154 list_a_builder.append(false);
4155 continue;
4156 }
4157
4158 let list_a_len = rng.gen_range(0..10);
4159 let list_b_builder = list_a_builder.values();
4160
4161 for _ in 0..list_a_len {
4162 if rng.gen_bool(0.2) {
4163 list_b_builder.append(false);
4164 continue;
4165 }
4166
4167 let list_b_len = rng.gen_range(0..10);
4168 let int_builder = list_b_builder.values();
4169 for _ in 0..list_b_len {
4170 match rng.gen_bool(0.2) {
4171 true => int_builder.append_null(),
4172 false => int_builder.append_value(rng.gen()),
4173 }
4174 }
4175 list_b_builder.append(true)
4176 }
4177 list_a_builder.append(true);
4178 }
4179
4180 let array = Arc::new(list_a_builder.finish());
4181 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4182
4183 writer.write(&batch).unwrap();
4184 let _metadata = writer.close().unwrap();
4185
4186 let buf = Bytes::from(buf);
4187
4188 let cases = [
4189 vec![
4190 RowSelector::skip(100),
4191 RowSelector::select(924),
4192 RowSelector::skip(100),
4193 RowSelector::select(924),
4194 ],
4195 vec![
4196 RowSelector::select(924),
4197 RowSelector::skip(100),
4198 RowSelector::select(924),
4199 RowSelector::skip(100),
4200 ],
4201 vec![
4202 RowSelector::skip(1023),
4203 RowSelector::select(1),
4204 RowSelector::skip(1023),
4205 RowSelector::select(1),
4206 ],
4207 vec![
4208 RowSelector::select(1),
4209 RowSelector::skip(1023),
4210 RowSelector::select(1),
4211 RowSelector::skip(1023),
4212 ],
4213 ];
4214
4215 for batch_size in [100, 1024, 2048] {
4216 for selection in &cases {
4217 let selection = RowSelection::from(selection.clone());
4218 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4219 .unwrap()
4220 .with_row_selection(selection.clone())
4221 .with_batch_size(batch_size)
4222 .build()
4223 .unwrap();
4224
4225 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4226 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4227 assert_eq!(actual.num_rows(), selection.row_count());
4228
4229 let mut batch_offset = 0;
4230 let mut actual_offset = 0;
4231 for selector in selection.iter() {
4232 if selector.skip {
4233 batch_offset += selector.row_count;
4234 continue;
4235 }
4236
4237 assert_eq!(
4238 batch.slice(batch_offset, selector.row_count),
4239 actual.slice(actual_offset, selector.row_count)
4240 );
4241
4242 batch_offset += selector.row_count;
4243 actual_offset += selector.row_count;
4244 }
4245 }
4246 }
4247 }
4248
4249 #[test]
4250 fn test_read_old_nested_list() {
4251 use arrow::datatypes::DataType;
4252 use arrow::datatypes::ToByteSlice;
4253
4254 let testdata = arrow::util::test_util::parquet_test_data();
4255 let path = format!("{testdata}/old_list_structure.parquet");
4264 let test_file = File::open(path).unwrap();
4265
4266 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4268
4269 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4271
4272 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4274 "array",
4275 DataType::Int32,
4276 false,
4277 ))))
4278 .len(2)
4279 .add_buffer(a_value_offsets)
4280 .add_child_data(a_values.into_data())
4281 .build()
4282 .unwrap();
4283 let a = ListArray::from(a_list_data);
4284
4285 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4286 let mut reader = builder.build().unwrap();
4287 let out = reader.next().unwrap().unwrap();
4288 assert_eq!(out.num_rows(), 1);
4289 assert_eq!(out.num_columns(), 1);
4290 let c0 = out.column(0);
4292 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4293 let r0 = c0arr.value(0);
4295 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4296 assert_eq!(r0arr, &a);
4297 }
4298
4299 #[test]
4300 fn test_map_no_value() {
4301 let testdata = arrow::util::test_util::parquet_test_data();
4321 let path = format!("{testdata}/map_no_value.parquet");
4322 let file = File::open(path).unwrap();
4323
4324 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4325 .unwrap()
4326 .build()
4327 .unwrap();
4328 let out = reader.next().unwrap().unwrap();
4329 assert_eq!(out.num_rows(), 3);
4330 assert_eq!(out.num_columns(), 3);
4331 let c0 = out.column(1).as_list::<i32>();
4333 let c1 = out.column(2).as_list::<i32>();
4334 assert_eq!(c0.len(), c1.len());
4335 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4336 }
4337}