1use arrow_array::cast::AsArray;
21use arrow_array::Array;
22use arrow_array::{RecordBatch, RecordBatchReader};
23use arrow_schema::{ArrowError, DataType as ArrowType, Schema, SchemaRef};
24use arrow_select::filter::prep_null_mask_filter;
25pub use filter::{ArrowPredicate, ArrowPredicateFn, RowFilter};
26pub use selection::{RowSelection, RowSelector};
27use std::collections::VecDeque;
28use std::sync::Arc;
29
30pub use crate::arrow::array_reader::RowGroups;
31use crate::arrow::array_reader::{build_array_reader, ArrayReader};
32use crate::arrow::schema::{parquet_to_arrow_schema_and_fields, ParquetField};
33use crate::arrow::{parquet_to_arrow_field_levels, FieldLevels, ProjectionMask};
34use crate::column::page::{PageIterator, PageReader};
35#[cfg(feature = "encryption")]
36use crate::encryption::decrypt::FileDecryptionProperties;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
39use crate::file::reader::{ChunkReader, SerializedPageReader};
40use crate::schema::types::SchemaDescriptor;
41
42mod filter;
43mod selection;
44pub mod statistics;
45
46pub struct ArrowReaderBuilder<T> {
55 pub(crate) input: T,
56
57 pub(crate) metadata: Arc<ParquetMetaData>,
58
59 pub(crate) schema: SchemaRef,
60
61 pub(crate) fields: Option<Arc<ParquetField>>,
62
63 pub(crate) batch_size: usize,
64
65 pub(crate) row_groups: Option<Vec<usize>>,
66
67 pub(crate) projection: ProjectionMask,
68
69 pub(crate) filter: Option<RowFilter>,
70
71 pub(crate) selection: Option<RowSelection>,
72
73 pub(crate) limit: Option<usize>,
74
75 pub(crate) offset: Option<usize>,
76}
77
78impl<T> ArrowReaderBuilder<T> {
79 pub(crate) fn new_builder(input: T, metadata: ArrowReaderMetadata) -> Self {
80 Self {
81 input,
82 metadata: metadata.metadata,
83 schema: metadata.schema,
84 fields: metadata.fields,
85 batch_size: 1024,
86 row_groups: None,
87 projection: ProjectionMask::all(),
88 filter: None,
89 selection: None,
90 limit: None,
91 offset: None,
92 }
93 }
94
95 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
97 &self.metadata
98 }
99
100 pub fn parquet_schema(&self) -> &SchemaDescriptor {
102 self.metadata.file_metadata().schema_descr()
103 }
104
105 pub fn schema(&self) -> &SchemaRef {
107 &self.schema
108 }
109
110 pub fn with_batch_size(self, batch_size: usize) -> Self {
113 let batch_size = batch_size.min(self.metadata.file_metadata().num_rows() as usize);
115 Self { batch_size, ..self }
116 }
117
118 pub fn with_row_groups(self, row_groups: Vec<usize>) -> Self {
122 Self {
123 row_groups: Some(row_groups),
124 ..self
125 }
126 }
127
128 pub fn with_projection(self, mask: ProjectionMask) -> Self {
130 Self {
131 projection: mask,
132 ..self
133 }
134 }
135
136 pub fn with_row_selection(self, selection: RowSelection) -> Self {
196 Self {
197 selection: Some(selection),
198 ..self
199 }
200 }
201
202 pub fn with_row_filter(self, filter: RowFilter) -> Self {
209 Self {
210 filter: Some(filter),
211 ..self
212 }
213 }
214
215 pub fn with_limit(self, limit: usize) -> Self {
223 Self {
224 limit: Some(limit),
225 ..self
226 }
227 }
228
229 pub fn with_offset(self, offset: usize) -> Self {
237 Self {
238 offset: Some(offset),
239 ..self
240 }
241 }
242}
243
244#[derive(Debug, Clone, Default)]
249pub struct ArrowReaderOptions {
250 skip_arrow_metadata: bool,
252 supplied_schema: Option<SchemaRef>,
254 pub(crate) page_index: bool,
256 #[cfg(feature = "encryption")]
258 pub(crate) file_decryption_properties: Option<FileDecryptionProperties>,
259}
260
261impl ArrowReaderOptions {
262 pub fn new() -> Self {
264 Self::default()
265 }
266
267 pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
274 Self {
275 skip_arrow_metadata,
276 ..self
277 }
278 }
279
280 pub fn with_schema(self, schema: SchemaRef) -> Self {
327 Self {
328 supplied_schema: Some(schema),
329 skip_arrow_metadata: true,
330 ..self
331 }
332 }
333
334 pub fn with_page_index(self, page_index: bool) -> Self {
347 Self { page_index, ..self }
348 }
349
350 #[cfg(feature = "encryption")]
354 pub fn with_file_decryption_properties(
355 self,
356 file_decryption_properties: FileDecryptionProperties,
357 ) -> Self {
358 Self {
359 file_decryption_properties: Some(file_decryption_properties),
360 ..self
361 }
362 }
363}
364
365#[derive(Debug, Clone)]
380pub struct ArrowReaderMetadata {
381 pub(crate) metadata: Arc<ParquetMetaData>,
383 pub(crate) schema: SchemaRef,
385
386 pub(crate) fields: Option<Arc<ParquetField>>,
387}
388
389impl ArrowReaderMetadata {
390 pub fn load<T: ChunkReader>(reader: &T, options: ArrowReaderOptions) -> Result<Self> {
401 let metadata = ParquetMetaDataReader::new().with_page_indexes(options.page_index);
402 #[cfg(feature = "encryption")]
403 let metadata =
404 metadata.with_decryption_properties(options.file_decryption_properties.as_ref());
405 let metadata = metadata.parse_and_finish(reader)?;
406 Self::try_new(Arc::new(metadata), options)
407 }
408
409 pub fn try_new(metadata: Arc<ParquetMetaData>, options: ArrowReaderOptions) -> Result<Self> {
416 match options.supplied_schema {
417 Some(supplied_schema) => Self::with_supplied_schema(metadata, supplied_schema.clone()),
418 None => {
419 let kv_metadata = match options.skip_arrow_metadata {
420 true => None,
421 false => metadata.file_metadata().key_value_metadata(),
422 };
423
424 let (schema, fields) = parquet_to_arrow_schema_and_fields(
425 metadata.file_metadata().schema_descr(),
426 ProjectionMask::all(),
427 kv_metadata,
428 )?;
429
430 Ok(Self {
431 metadata,
432 schema: Arc::new(schema),
433 fields: fields.map(Arc::new),
434 })
435 }
436 }
437 }
438
439 fn with_supplied_schema(
440 metadata: Arc<ParquetMetaData>,
441 supplied_schema: SchemaRef,
442 ) -> Result<Self> {
443 let parquet_schema = metadata.file_metadata().schema_descr();
444 let field_levels = parquet_to_arrow_field_levels(
445 parquet_schema,
446 ProjectionMask::all(),
447 Some(supplied_schema.fields()),
448 )?;
449 let fields = field_levels.fields;
450 let inferred_len = fields.len();
451 let supplied_len = supplied_schema.fields().len();
452 if inferred_len != supplied_len {
456 Err(arrow_err!(format!(
457 "incompatible arrow schema, expected {} columns received {}",
458 inferred_len, supplied_len
459 )))
460 } else {
461 let diff_fields: Vec<_> = supplied_schema
462 .fields()
463 .iter()
464 .zip(fields.iter())
465 .filter_map(|(field1, field2)| {
466 if field1 != field2 {
467 Some(field1.name().clone())
468 } else {
469 None
470 }
471 })
472 .collect();
473
474 if !diff_fields.is_empty() {
475 Err(ParquetError::ArrowError(format!(
476 "incompatible arrow schema, the following fields could not be cast: [{}]",
477 diff_fields.join(", ")
478 )))
479 } else {
480 Ok(Self {
481 metadata,
482 schema: supplied_schema,
483 fields: field_levels.levels.map(Arc::new),
484 })
485 }
486 }
487 }
488
489 pub fn metadata(&self) -> &Arc<ParquetMetaData> {
491 &self.metadata
492 }
493
494 pub fn parquet_schema(&self) -> &SchemaDescriptor {
496 self.metadata.file_metadata().schema_descr()
497 }
498
499 pub fn schema(&self) -> &SchemaRef {
501 &self.schema
502 }
503}
504
505#[doc(hidden)]
506pub struct SyncReader<T: ChunkReader>(T);
508
509pub type ParquetRecordBatchReaderBuilder<T> = ArrowReaderBuilder<SyncReader<T>>;
515
516impl<T: ChunkReader + 'static> ParquetRecordBatchReaderBuilder<T> {
517 pub fn try_new(reader: T) -> Result<Self> {
546 Self::try_new_with_options(reader, Default::default())
547 }
548
549 pub fn try_new_with_options(reader: T, options: ArrowReaderOptions) -> Result<Self> {
551 let metadata = ArrowReaderMetadata::load(&reader, options)?;
552 Ok(Self::new_with_metadata(reader, metadata))
553 }
554
555 pub fn new_with_metadata(input: T, metadata: ArrowReaderMetadata) -> Self {
593 Self::new_builder(SyncReader(input), metadata)
594 }
595
596 pub fn build(self) -> Result<ParquetRecordBatchReader> {
600 let batch_size = self
602 .batch_size
603 .min(self.metadata.file_metadata().num_rows() as usize);
604
605 let row_groups = self
606 .row_groups
607 .unwrap_or_else(|| (0..self.metadata.num_row_groups()).collect());
608
609 let reader = ReaderRowGroups {
610 reader: Arc::new(self.input.0),
611 metadata: self.metadata,
612 row_groups,
613 };
614
615 let mut filter = self.filter;
616 let mut selection = self.selection;
617
618 if let Some(filter) = filter.as_mut() {
619 for predicate in filter.predicates.iter_mut() {
620 if !selects_any(selection.as_ref()) {
621 break;
622 }
623
624 let array_reader =
625 build_array_reader(self.fields.as_deref(), predicate.projection(), &reader)?;
626
627 selection = Some(evaluate_predicate(
628 batch_size,
629 array_reader,
630 selection,
631 predicate.as_mut(),
632 )?);
633 }
634 }
635
636 let array_reader = build_array_reader(self.fields.as_deref(), &self.projection, &reader)?;
637
638 if !selects_any(selection.as_ref()) {
640 selection = Some(RowSelection::from(vec![]));
641 }
642
643 Ok(ParquetRecordBatchReader::new(
644 batch_size,
645 array_reader,
646 apply_range(selection, reader.num_rows(), self.offset, self.limit),
647 ))
648 }
649}
650
651struct ReaderRowGroups<T: ChunkReader> {
652 reader: Arc<T>,
653
654 metadata: Arc<ParquetMetaData>,
655 row_groups: Vec<usize>,
657}
658
659impl<T: ChunkReader + 'static> RowGroups for ReaderRowGroups<T> {
660 fn num_rows(&self) -> usize {
661 let meta = self.metadata.row_groups();
662 self.row_groups
663 .iter()
664 .map(|x| meta[*x].num_rows() as usize)
665 .sum()
666 }
667
668 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
669 Ok(Box::new(ReaderPageIterator {
670 column_idx: i,
671 reader: self.reader.clone(),
672 metadata: self.metadata.clone(),
673 row_groups: self.row_groups.clone().into_iter(),
674 }))
675 }
676}
677
678struct ReaderPageIterator<T: ChunkReader> {
679 reader: Arc<T>,
680 column_idx: usize,
681 row_groups: std::vec::IntoIter<usize>,
682 metadata: Arc<ParquetMetaData>,
683}
684
685impl<T: ChunkReader + 'static> ReaderPageIterator<T> {
686 fn next_page_reader(&mut self, rg_idx: usize) -> Result<SerializedPageReader<T>> {
688 let rg = self.metadata.row_group(rg_idx);
689 let column_chunk_metadata = rg.column(self.column_idx);
690 let offset_index = self.metadata.offset_index();
691 let page_locations = offset_index
694 .filter(|i| !i[rg_idx].is_empty())
695 .map(|i| i[rg_idx][self.column_idx].page_locations.clone());
696 let total_rows = rg.num_rows() as usize;
697 let reader = self.reader.clone();
698
699 SerializedPageReader::new(reader, column_chunk_metadata, total_rows, page_locations)?
700 .add_crypto_context(
701 rg_idx,
702 self.column_idx,
703 self.metadata.as_ref(),
704 column_chunk_metadata,
705 )
706 }
707}
708
709impl<T: ChunkReader + 'static> Iterator for ReaderPageIterator<T> {
710 type Item = Result<Box<dyn PageReader>>;
711
712 fn next(&mut self) -> Option<Self::Item> {
713 let rg_idx = self.row_groups.next()?;
714 let page_reader = self
715 .next_page_reader(rg_idx)
716 .map(|page_reader| Box::new(page_reader) as _);
717 Some(page_reader)
718 }
719}
720
721impl<T: ChunkReader + 'static> PageIterator for ReaderPageIterator<T> {}
722
723pub struct ParquetRecordBatchReader {
726 batch_size: usize,
727 array_reader: Box<dyn ArrayReader>,
728 schema: SchemaRef,
729 selection: Option<VecDeque<RowSelector>>,
730}
731
732impl Iterator for ParquetRecordBatchReader {
733 type Item = Result<RecordBatch, ArrowError>;
734
735 fn next(&mut self) -> Option<Self::Item> {
736 let mut read_records = 0;
737 match self.selection.as_mut() {
738 Some(selection) => {
739 while read_records < self.batch_size && !selection.is_empty() {
740 let front = selection.pop_front().unwrap();
741 if front.skip {
742 let skipped = match self.array_reader.skip_records(front.row_count) {
743 Ok(skipped) => skipped,
744 Err(e) => return Some(Err(e.into())),
745 };
746
747 if skipped != front.row_count {
748 return Some(Err(general_err!(
749 "failed to skip rows, expected {}, got {}",
750 front.row_count,
751 skipped
752 )
753 .into()));
754 }
755 continue;
756 }
757
758 if front.row_count == 0 {
761 continue;
762 }
763
764 let need_read = self.batch_size - read_records;
766 let to_read = match front.row_count.checked_sub(need_read) {
767 Some(remaining) if remaining != 0 => {
768 selection.push_front(RowSelector::select(remaining));
771 need_read
772 }
773 _ => front.row_count,
774 };
775 match self.array_reader.read_records(to_read) {
776 Ok(0) => break,
777 Ok(rec) => read_records += rec,
778 Err(error) => return Some(Err(error.into())),
779 }
780 }
781 }
782 None => {
783 if let Err(error) = self.array_reader.read_records(self.batch_size) {
784 return Some(Err(error.into()));
785 }
786 }
787 };
788
789 match self.array_reader.consume_batch() {
790 Err(error) => Some(Err(error.into())),
791 Ok(array) => {
792 let struct_array = array.as_struct_opt().ok_or_else(|| {
793 ArrowError::ParquetError(
794 "Struct array reader should return struct array".to_string(),
795 )
796 });
797
798 match struct_array {
799 Err(err) => Some(Err(err)),
800 Ok(e) => (e.len() > 0).then(|| Ok(RecordBatch::from(e))),
801 }
802 }
803 }
804 }
805}
806
807impl RecordBatchReader for ParquetRecordBatchReader {
808 fn schema(&self) -> SchemaRef {
813 self.schema.clone()
814 }
815}
816
817impl ParquetRecordBatchReader {
818 pub fn try_new<T: ChunkReader + 'static>(reader: T, batch_size: usize) -> Result<Self> {
822 ParquetRecordBatchReaderBuilder::try_new(reader)?
823 .with_batch_size(batch_size)
824 .build()
825 }
826
827 pub fn try_new_with_row_groups(
832 levels: &FieldLevels,
833 row_groups: &dyn RowGroups,
834 batch_size: usize,
835 selection: Option<RowSelection>,
836 ) -> Result<Self> {
837 let array_reader =
838 build_array_reader(levels.levels.as_ref(), &ProjectionMask::all(), row_groups)?;
839
840 Ok(Self {
841 batch_size,
842 array_reader,
843 schema: Arc::new(Schema::new(levels.fields.clone())),
844 selection: selection.map(|s| s.trim().into()),
845 })
846 }
847
848 pub(crate) fn new(
852 batch_size: usize,
853 array_reader: Box<dyn ArrayReader>,
854 selection: Option<RowSelection>,
855 ) -> Self {
856 let schema = match array_reader.get_data_type() {
857 ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
858 _ => unreachable!("Struct array reader's data type is not struct!"),
859 };
860
861 Self {
862 batch_size,
863 array_reader,
864 schema: Arc::new(schema),
865 selection: selection.map(|s| s.trim().into()),
866 }
867 }
868}
869
870pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
872 selection.map(|x| x.selects_any()).unwrap_or(true)
873}
874
875pub(crate) fn apply_range(
877 mut selection: Option<RowSelection>,
878 row_count: usize,
879 offset: Option<usize>,
880 limit: Option<usize>,
881) -> Option<RowSelection> {
882 if let Some(offset) = offset {
884 selection = Some(match row_count.checked_sub(offset) {
885 None => RowSelection::from(vec![]),
886 Some(remaining) => selection
887 .map(|selection| selection.offset(offset))
888 .unwrap_or_else(|| {
889 RowSelection::from(vec![
890 RowSelector::skip(offset),
891 RowSelector::select(remaining),
892 ])
893 }),
894 });
895 }
896
897 if let Some(limit) = limit {
899 selection = Some(
900 selection
901 .map(|selection| selection.limit(limit))
902 .unwrap_or_else(|| {
903 RowSelection::from(vec![RowSelector::select(limit.min(row_count))])
904 }),
905 );
906 }
907 selection
908}
909
910pub(crate) fn evaluate_predicate(
921 batch_size: usize,
922 array_reader: Box<dyn ArrayReader>,
923 input_selection: Option<RowSelection>,
924 predicate: &mut dyn ArrowPredicate,
925) -> Result<RowSelection> {
926 let reader = ParquetRecordBatchReader::new(batch_size, array_reader, input_selection.clone());
927 let mut filters = vec![];
928 for maybe_batch in reader {
929 let maybe_batch = maybe_batch?;
930 let input_rows = maybe_batch.num_rows();
931 let filter = predicate.evaluate(maybe_batch)?;
932 if filter.len() != input_rows {
934 return Err(arrow_err!(
935 "ArrowPredicate predicate returned {} rows, expected {input_rows}",
936 filter.len()
937 ));
938 }
939 match filter.null_count() {
940 0 => filters.push(filter),
941 _ => filters.push(prep_null_mask_filter(&filter)),
942 };
943 }
944
945 let raw = RowSelection::from_filters(&filters);
946 Ok(match input_selection {
947 Some(selection) => selection.and_then(&raw),
948 None => raw,
949 })
950}
951
952#[cfg(test)]
953mod tests {
954 use std::cmp::min;
955 use std::collections::{HashMap, VecDeque};
956 use std::fmt::Formatter;
957 use std::fs::File;
958 use std::io::Seek;
959 use std::path::PathBuf;
960 use std::sync::Arc;
961
962 use bytes::Bytes;
963 use half::f16;
964 use num::PrimInt;
965 use rand::{rng, Rng, RngCore};
966 use tempfile::tempfile;
967
968 use arrow_array::builder::*;
969 use arrow_array::cast::AsArray;
970 use arrow_array::types::{
971 Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
972 Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
973 };
974 use arrow_array::*;
975 use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
976 use arrow_data::{ArrayData, ArrayDataBuilder};
977 use arrow_schema::{
978 ArrowError, DataType as ArrowDataType, Field, Fields, Schema, SchemaRef, TimeUnit,
979 };
980 use arrow_select::concat::concat_batches;
981
982 use crate::arrow::arrow_reader::{
983 ArrowPredicateFn, ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReader,
984 ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
985 };
986 use crate::arrow::schema::add_encoded_arrow_schema_to_metadata;
987 use crate::arrow::{ArrowWriter, ProjectionMask};
988 use crate::basic::{ConvertedType, Encoding, Repetition, Type as PhysicalType};
989 use crate::column::reader::decoder::REPETITION_LEVELS_BATCH_SIZE;
990 use crate::data_type::{
991 BoolType, ByteArray, ByteArrayType, DataType, FixedLenByteArray, FixedLenByteArrayType,
992 FloatType, Int32Type, Int64Type, Int96, Int96Type,
993 };
994 use crate::errors::Result;
995 use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
996 use crate::file::writer::SerializedFileWriter;
997 use crate::schema::parser::parse_message_type;
998 use crate::schema::types::{Type, TypePtr};
999 use crate::util::test_common::rand_gen::RandGen;
1000
1001 #[test]
1002 fn test_arrow_reader_all_columns() {
1003 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1004
1005 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1006 let original_schema = Arc::clone(builder.schema());
1007 let reader = builder.build().unwrap();
1008
1009 assert_eq!(original_schema.fields(), reader.schema().fields());
1011 }
1012
1013 #[test]
1014 fn test_arrow_reader_single_column() {
1015 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1016
1017 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1018 let original_schema = Arc::clone(builder.schema());
1019
1020 let mask = ProjectionMask::leaves(builder.parquet_schema(), [2]);
1021 let reader = builder.with_projection(mask).build().unwrap();
1022
1023 assert_eq!(1, reader.schema().fields().len());
1025 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1026 }
1027
1028 #[test]
1029 fn test_arrow_reader_single_column_by_name() {
1030 let file = get_test_file("parquet/generated_simple_numerics/blogs.parquet");
1031
1032 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
1033 let original_schema = Arc::clone(builder.schema());
1034
1035 let mask = ProjectionMask::columns(builder.parquet_schema(), ["blog_id"]);
1036 let reader = builder.with_projection(mask).build().unwrap();
1037
1038 assert_eq!(1, reader.schema().fields().len());
1040 assert_eq!(original_schema.fields()[1], reader.schema().fields()[0]);
1041 }
1042
1043 #[test]
1044 fn test_null_column_reader_test() {
1045 let mut file = tempfile::tempfile().unwrap();
1046
1047 let schema = "
1048 message message {
1049 OPTIONAL INT32 int32;
1050 }
1051 ";
1052 let schema = Arc::new(parse_message_type(schema).unwrap());
1053
1054 let def_levels = vec![vec![0, 0, 0], vec![0, 0, 0, 0]];
1055 generate_single_column_file_with_data::<Int32Type>(
1056 &[vec![], vec![]],
1057 Some(&def_levels),
1058 file.try_clone().unwrap(), schema,
1060 Some(Field::new("int32", ArrowDataType::Null, true)),
1061 &Default::default(),
1062 )
1063 .unwrap();
1064
1065 file.rewind().unwrap();
1066
1067 let record_reader = ParquetRecordBatchReader::try_new(file, 2).unwrap();
1068 let batches = record_reader.collect::<Result<Vec<_>, _>>().unwrap();
1069
1070 assert_eq!(batches.len(), 4);
1071 for batch in &batches[0..3] {
1072 assert_eq!(batch.num_rows(), 2);
1073 assert_eq!(batch.num_columns(), 1);
1074 assert_eq!(batch.column(0).null_count(), 2);
1075 }
1076
1077 assert_eq!(batches[3].num_rows(), 1);
1078 assert_eq!(batches[3].num_columns(), 1);
1079 assert_eq!(batches[3].column(0).null_count(), 1);
1080 }
1081
1082 #[test]
1083 fn test_primitive_single_column_reader_test() {
1084 run_single_column_reader_tests::<BoolType, _, BoolType>(
1085 2,
1086 ConvertedType::NONE,
1087 None,
1088 |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())),
1089 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1090 );
1091 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1092 2,
1093 ConvertedType::NONE,
1094 None,
1095 |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())),
1096 &[
1097 Encoding::PLAIN,
1098 Encoding::RLE_DICTIONARY,
1099 Encoding::DELTA_BINARY_PACKED,
1100 Encoding::BYTE_STREAM_SPLIT,
1101 ],
1102 );
1103 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1104 2,
1105 ConvertedType::NONE,
1106 None,
1107 |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())),
1108 &[
1109 Encoding::PLAIN,
1110 Encoding::RLE_DICTIONARY,
1111 Encoding::DELTA_BINARY_PACKED,
1112 Encoding::BYTE_STREAM_SPLIT,
1113 ],
1114 );
1115 run_single_column_reader_tests::<FloatType, _, FloatType>(
1116 2,
1117 ConvertedType::NONE,
1118 None,
1119 |vals| Arc::new(Float32Array::from_iter(vals.iter().cloned())),
1120 &[Encoding::PLAIN, Encoding::BYTE_STREAM_SPLIT],
1121 );
1122 }
1123
1124 #[test]
1125 fn test_unsigned_primitive_single_column_reader_test() {
1126 run_single_column_reader_tests::<Int32Type, _, Int32Type>(
1127 2,
1128 ConvertedType::UINT_32,
1129 Some(ArrowDataType::UInt32),
1130 |vals| {
1131 Arc::new(UInt32Array::from_iter(
1132 vals.iter().map(|x| x.map(|x| x as u32)),
1133 ))
1134 },
1135 &[
1136 Encoding::PLAIN,
1137 Encoding::RLE_DICTIONARY,
1138 Encoding::DELTA_BINARY_PACKED,
1139 ],
1140 );
1141 run_single_column_reader_tests::<Int64Type, _, Int64Type>(
1142 2,
1143 ConvertedType::UINT_64,
1144 Some(ArrowDataType::UInt64),
1145 |vals| {
1146 Arc::new(UInt64Array::from_iter(
1147 vals.iter().map(|x| x.map(|x| x as u64)),
1148 ))
1149 },
1150 &[
1151 Encoding::PLAIN,
1152 Encoding::RLE_DICTIONARY,
1153 Encoding::DELTA_BINARY_PACKED,
1154 ],
1155 );
1156 }
1157
1158 #[test]
1159 fn test_unsigned_roundtrip() {
1160 let schema = Arc::new(Schema::new(vec![
1161 Field::new("uint32", ArrowDataType::UInt32, true),
1162 Field::new("uint64", ArrowDataType::UInt64, true),
1163 ]));
1164
1165 let mut buf = Vec::with_capacity(1024);
1166 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
1167
1168 let original = RecordBatch::try_new(
1169 schema,
1170 vec![
1171 Arc::new(UInt32Array::from_iter_values([
1172 0,
1173 i32::MAX as u32,
1174 u32::MAX,
1175 ])),
1176 Arc::new(UInt64Array::from_iter_values([
1177 0,
1178 i64::MAX as u64,
1179 u64::MAX,
1180 ])),
1181 ],
1182 )
1183 .unwrap();
1184
1185 writer.write(&original).unwrap();
1186 writer.close().unwrap();
1187
1188 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024).unwrap();
1189 let ret = reader.next().unwrap().unwrap();
1190 assert_eq!(ret, original);
1191
1192 ret.column(0)
1194 .as_any()
1195 .downcast_ref::<UInt32Array>()
1196 .unwrap();
1197
1198 ret.column(1)
1199 .as_any()
1200 .downcast_ref::<UInt64Array>()
1201 .unwrap();
1202 }
1203
1204 #[test]
1205 fn test_float16_roundtrip() -> Result<()> {
1206 let schema = Arc::new(Schema::new(vec![
1207 Field::new("float16", ArrowDataType::Float16, false),
1208 Field::new("float16-nullable", ArrowDataType::Float16, true),
1209 ]));
1210
1211 let mut buf = Vec::with_capacity(1024);
1212 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1213
1214 let original = RecordBatch::try_new(
1215 schema,
1216 vec![
1217 Arc::new(Float16Array::from_iter_values([
1218 f16::EPSILON,
1219 f16::MIN,
1220 f16::MAX,
1221 f16::NAN,
1222 f16::INFINITY,
1223 f16::NEG_INFINITY,
1224 f16::ONE,
1225 f16::NEG_ONE,
1226 f16::ZERO,
1227 f16::NEG_ZERO,
1228 f16::E,
1229 f16::PI,
1230 f16::FRAC_1_PI,
1231 ])),
1232 Arc::new(Float16Array::from(vec![
1233 None,
1234 None,
1235 None,
1236 Some(f16::NAN),
1237 Some(f16::INFINITY),
1238 Some(f16::NEG_INFINITY),
1239 None,
1240 None,
1241 None,
1242 None,
1243 None,
1244 None,
1245 Some(f16::FRAC_1_PI),
1246 ])),
1247 ],
1248 )?;
1249
1250 writer.write(&original)?;
1251 writer.close()?;
1252
1253 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1254 let ret = reader.next().unwrap()?;
1255 assert_eq!(ret, original);
1256
1257 ret.column(0).as_primitive::<Float16Type>();
1259 ret.column(1).as_primitive::<Float16Type>();
1260
1261 Ok(())
1262 }
1263
1264 #[test]
1265 fn test_time_utc_roundtrip() -> Result<()> {
1266 let schema = Arc::new(Schema::new(vec![
1267 Field::new(
1268 "time_millis",
1269 ArrowDataType::Time32(TimeUnit::Millisecond),
1270 true,
1271 )
1272 .with_metadata(HashMap::from_iter(vec![(
1273 "adjusted_to_utc".to_string(),
1274 "".to_string(),
1275 )])),
1276 Field::new(
1277 "time_micros",
1278 ArrowDataType::Time64(TimeUnit::Microsecond),
1279 true,
1280 )
1281 .with_metadata(HashMap::from_iter(vec![(
1282 "adjusted_to_utc".to_string(),
1283 "".to_string(),
1284 )])),
1285 ]));
1286
1287 let mut buf = Vec::with_capacity(1024);
1288 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1289
1290 let original = RecordBatch::try_new(
1291 schema,
1292 vec![
1293 Arc::new(Time32MillisecondArray::from(vec![
1294 Some(-1),
1295 Some(0),
1296 Some(86_399_000),
1297 Some(86_400_000),
1298 Some(86_401_000),
1299 None,
1300 ])),
1301 Arc::new(Time64MicrosecondArray::from(vec![
1302 Some(-1),
1303 Some(0),
1304 Some(86_399 * 1_000_000),
1305 Some(86_400 * 1_000_000),
1306 Some(86_401 * 1_000_000),
1307 None,
1308 ])),
1309 ],
1310 )?;
1311
1312 writer.write(&original)?;
1313 writer.close()?;
1314
1315 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1316 let ret = reader.next().unwrap()?;
1317 assert_eq!(ret, original);
1318
1319 ret.column(0).as_primitive::<Time32MillisecondType>();
1321 ret.column(1).as_primitive::<Time64MicrosecondType>();
1322
1323 Ok(())
1324 }
1325
1326 #[test]
1327 fn test_date32_roundtrip() -> Result<()> {
1328 use arrow_array::Date32Array;
1329
1330 let schema = Arc::new(Schema::new(vec![Field::new(
1331 "date32",
1332 ArrowDataType::Date32,
1333 false,
1334 )]));
1335
1336 let mut buf = Vec::with_capacity(1024);
1337
1338 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;
1339
1340 let original = RecordBatch::try_new(
1341 schema,
1342 vec![Arc::new(Date32Array::from(vec![
1343 -1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
1344 ]))],
1345 )?;
1346
1347 writer.write(&original)?;
1348 writer.close()?;
1349
1350 let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
1351 let ret = reader.next().unwrap()?;
1352 assert_eq!(ret, original);
1353
1354 ret.column(0).as_primitive::<Date32Type>();
1356
1357 Ok(())
1358 }
1359
1360 #[test]
1361 fn test_date64_roundtrip() -> Result<()> {
1362 use arrow_array::Date64Array;
1363
1364 let schema = Arc::new(Schema::new(vec![
1365 Field::new("small-date64", ArrowDataType::Date64, false),
1366 Field::new("big-date64", ArrowDataType::Date64, false),
1367 Field::new("invalid-date64", ArrowDataType::Date64, false),
1368 ]));
1369
1370 let mut default_buf = Vec::with_capacity(1024);
1371 let mut coerce_buf = Vec::with_capacity(1024);
1372
1373 let coerce_props = WriterProperties::builder().set_coerce_types(true).build();
1374
1375 let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
1376 let mut coerce_writer =
1377 ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;
1378
1379 static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;
1380
1381 let original = RecordBatch::try_new(
1382 schema,
1383 vec![
1384 Arc::new(Date64Array::from(vec![
1386 -1_000_000 * NUM_MILLISECONDS_IN_DAY,
1387 -1_000 * NUM_MILLISECONDS_IN_DAY,
1388 0,
1389 1_000 * NUM_MILLISECONDS_IN_DAY,
1390 1_000_000 * NUM_MILLISECONDS_IN_DAY,
1391 ])),
1392 Arc::new(Date64Array::from(vec![
1394 -10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1395 -1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1396 0,
1397 1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1398 10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
1399 ])),
1400 Arc::new(Date64Array::from(vec![
1402 -1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1403 -1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1404 1,
1405 1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1406 1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
1407 ])),
1408 ],
1409 )?;
1410
1411 default_writer.write(&original)?;
1412 coerce_writer.write(&original)?;
1413
1414 default_writer.close()?;
1415 coerce_writer.close()?;
1416
1417 let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
1418 let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;
1419
1420 let default_ret = default_reader.next().unwrap()?;
1421 let coerce_ret = coerce_reader.next().unwrap()?;
1422
1423 assert_eq!(default_ret, original);
1425
1426 assert_eq!(coerce_ret.column(0), original.column(0));
1428 assert_ne!(coerce_ret.column(1), original.column(1));
1429 assert_ne!(coerce_ret.column(2), original.column(2));
1430
1431 default_ret.column(0).as_primitive::<Date64Type>();
1433 coerce_ret.column(0).as_primitive::<Date64Type>();
1434
1435 Ok(())
1436 }
1437 struct RandFixedLenGen {}
1438
1439 impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
1440 fn gen(len: i32) -> FixedLenByteArray {
1441 let mut v = vec![0u8; len as usize];
1442 rng().fill_bytes(&mut v);
1443 ByteArray::from(v).into()
1444 }
1445 }
1446
1447 #[test]
1448 fn test_fixed_length_binary_column_reader() {
1449 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1450 20,
1451 ConvertedType::NONE,
1452 None,
1453 |vals| {
1454 let mut builder = FixedSizeBinaryBuilder::with_capacity(vals.len(), 20);
1455 for val in vals {
1456 match val {
1457 Some(b) => builder.append_value(b).unwrap(),
1458 None => builder.append_null(),
1459 }
1460 }
1461 Arc::new(builder.finish())
1462 },
1463 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1464 );
1465 }
1466
1467 #[test]
1468 fn test_interval_day_time_column_reader() {
1469 run_single_column_reader_tests::<FixedLenByteArrayType, _, RandFixedLenGen>(
1470 12,
1471 ConvertedType::INTERVAL,
1472 None,
1473 |vals| {
1474 Arc::new(
1475 vals.iter()
1476 .map(|x| {
1477 x.as_ref().map(|b| IntervalDayTime {
1478 days: i32::from_le_bytes(b.as_ref()[4..8].try_into().unwrap()),
1479 milliseconds: i32::from_le_bytes(
1480 b.as_ref()[8..12].try_into().unwrap(),
1481 ),
1482 })
1483 })
1484 .collect::<IntervalDayTimeArray>(),
1485 )
1486 },
1487 &[Encoding::PLAIN, Encoding::RLE_DICTIONARY],
1488 );
1489 }
1490
1491 #[test]
1492 fn test_int96_single_column_reader_test() {
1493 let encodings = &[Encoding::PLAIN, Encoding::RLE_DICTIONARY];
1494
1495 type TypeHintAndConversionFunction =
1496 (Option<ArrowDataType>, fn(&[Option<Int96>]) -> ArrayRef);
1497
1498 let resolutions: Vec<TypeHintAndConversionFunction> = vec![
1499 (None, |vals: &[Option<Int96>]| {
1501 Arc::new(TimestampNanosecondArray::from_iter(
1502 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1503 )) as ArrayRef
1504 }),
1505 (
1507 Some(ArrowDataType::Timestamp(TimeUnit::Second, None)),
1508 |vals: &[Option<Int96>]| {
1509 Arc::new(TimestampSecondArray::from_iter(
1510 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1511 )) as ArrayRef
1512 },
1513 ),
1514 (
1515 Some(ArrowDataType::Timestamp(TimeUnit::Millisecond, None)),
1516 |vals: &[Option<Int96>]| {
1517 Arc::new(TimestampMillisecondArray::from_iter(
1518 vals.iter().map(|x| x.map(|x| x.to_millis())),
1519 )) as ArrayRef
1520 },
1521 ),
1522 (
1523 Some(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)),
1524 |vals: &[Option<Int96>]| {
1525 Arc::new(TimestampMicrosecondArray::from_iter(
1526 vals.iter().map(|x| x.map(|x| x.to_micros())),
1527 )) as ArrayRef
1528 },
1529 ),
1530 (
1531 Some(ArrowDataType::Timestamp(TimeUnit::Nanosecond, None)),
1532 |vals: &[Option<Int96>]| {
1533 Arc::new(TimestampNanosecondArray::from_iter(
1534 vals.iter().map(|x| x.map(|x| x.to_nanos())),
1535 )) as ArrayRef
1536 },
1537 ),
1538 (
1540 Some(ArrowDataType::Timestamp(
1541 TimeUnit::Second,
1542 Some(Arc::from("-05:00")),
1543 )),
1544 |vals: &[Option<Int96>]| {
1545 Arc::new(
1546 TimestampSecondArray::from_iter(
1547 vals.iter().map(|x| x.map(|x| x.to_seconds())),
1548 )
1549 .with_timezone("-05:00"),
1550 ) as ArrayRef
1551 },
1552 ),
1553 ];
1554
1555 resolutions.iter().for_each(|(arrow_type, converter)| {
1556 run_single_column_reader_tests::<Int96Type, _, Int96Type>(
1557 2,
1558 ConvertedType::NONE,
1559 arrow_type.clone(),
1560 converter,
1561 encodings,
1562 );
1563 })
1564 }
1565
1566 struct RandUtf8Gen {}
1567
1568 impl RandGen<ByteArrayType> for RandUtf8Gen {
1569 fn gen(len: i32) -> ByteArray {
1570 Int32Type::gen(len).to_string().as_str().into()
1571 }
1572 }
1573
1574 #[test]
1575 fn test_utf8_single_column_reader_test() {
1576 fn string_converter<O: OffsetSizeTrait>(vals: &[Option<ByteArray>]) -> ArrayRef {
1577 Arc::new(GenericStringArray::<O>::from_iter(vals.iter().map(|x| {
1578 x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap())
1579 })))
1580 }
1581
1582 let encodings = &[
1583 Encoding::PLAIN,
1584 Encoding::RLE_DICTIONARY,
1585 Encoding::DELTA_LENGTH_BYTE_ARRAY,
1586 Encoding::DELTA_BYTE_ARRAY,
1587 ];
1588
1589 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1590 2,
1591 ConvertedType::NONE,
1592 None,
1593 |vals| {
1594 Arc::new(BinaryArray::from_iter(
1595 vals.iter().map(|x| x.as_ref().map(|x| x.data())),
1596 ))
1597 },
1598 encodings,
1599 );
1600
1601 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1602 2,
1603 ConvertedType::UTF8,
1604 None,
1605 string_converter::<i32>,
1606 encodings,
1607 );
1608
1609 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1610 2,
1611 ConvertedType::UTF8,
1612 Some(ArrowDataType::Utf8),
1613 string_converter::<i32>,
1614 encodings,
1615 );
1616
1617 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1618 2,
1619 ConvertedType::UTF8,
1620 Some(ArrowDataType::LargeUtf8),
1621 string_converter::<i64>,
1622 encodings,
1623 );
1624
1625 let small_key_types = [ArrowDataType::Int8, ArrowDataType::UInt8];
1626 for key in &small_key_types {
1627 for encoding in encodings {
1628 let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50);
1629 opts.encoding = *encoding;
1630
1631 let data_type =
1632 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1633
1634 single_column_reader_test::<ByteArrayType, _, RandUtf8Gen>(
1636 opts,
1637 2,
1638 ConvertedType::UTF8,
1639 Some(data_type.clone()),
1640 move |vals| {
1641 let vals = string_converter::<i32>(vals);
1642 arrow::compute::cast(&vals, &data_type).unwrap()
1643 },
1644 );
1645 }
1646 }
1647
1648 let key_types = [
1649 ArrowDataType::Int16,
1650 ArrowDataType::UInt16,
1651 ArrowDataType::Int32,
1652 ArrowDataType::UInt32,
1653 ArrowDataType::Int64,
1654 ArrowDataType::UInt64,
1655 ];
1656
1657 for key in &key_types {
1658 let data_type =
1659 ArrowDataType::Dictionary(Box::new(key.clone()), Box::new(ArrowDataType::Utf8));
1660
1661 run_single_column_reader_tests::<ByteArrayType, _, RandUtf8Gen>(
1662 2,
1663 ConvertedType::UTF8,
1664 Some(data_type.clone()),
1665 move |vals| {
1666 let vals = string_converter::<i32>(vals);
1667 arrow::compute::cast(&vals, &data_type).unwrap()
1668 },
1669 encodings,
1670 );
1671
1672 }
1689 }
1690
1691 #[test]
1692 fn test_decimal_nullable_struct() {
1693 let decimals = Decimal256Array::from_iter_values(
1694 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
1695 );
1696
1697 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1698 "decimals",
1699 decimals.data_type().clone(),
1700 false,
1701 )])))
1702 .len(8)
1703 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1704 .child_data(vec![decimals.into_data()])
1705 .build()
1706 .unwrap();
1707
1708 let written =
1709 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1710 .unwrap();
1711
1712 let mut buffer = Vec::with_capacity(1024);
1713 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1714 writer.write(&written).unwrap();
1715 writer.close().unwrap();
1716
1717 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1718 .unwrap()
1719 .collect::<Result<Vec<_>, _>>()
1720 .unwrap();
1721
1722 assert_eq!(&written.slice(0, 3), &read[0]);
1723 assert_eq!(&written.slice(3, 3), &read[1]);
1724 assert_eq!(&written.slice(6, 2), &read[2]);
1725 }
1726
1727 #[test]
1728 fn test_int32_nullable_struct() {
1729 let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1730 let data = ArrayDataBuilder::new(ArrowDataType::Struct(Fields::from(vec![Field::new(
1731 "int32",
1732 int32.data_type().clone(),
1733 false,
1734 )])))
1735 .len(8)
1736 .null_bit_buffer(Some(Buffer::from(&[0b11101111])))
1737 .child_data(vec![int32.into_data()])
1738 .build()
1739 .unwrap();
1740
1741 let written =
1742 RecordBatch::try_from_iter([("struct", Arc::new(StructArray::from(data)) as ArrayRef)])
1743 .unwrap();
1744
1745 let mut buffer = Vec::with_capacity(1024);
1746 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1747 writer.write(&written).unwrap();
1748 writer.close().unwrap();
1749
1750 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1751 .unwrap()
1752 .collect::<Result<Vec<_>, _>>()
1753 .unwrap();
1754
1755 assert_eq!(&written.slice(0, 3), &read[0]);
1756 assert_eq!(&written.slice(3, 3), &read[1]);
1757 assert_eq!(&written.slice(6, 2), &read[2]);
1758 }
1759
1760 #[test]
1761 fn test_decimal_list() {
1762 let decimals = Decimal128Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]);
1763
1764 let data = ArrayDataBuilder::new(ArrowDataType::List(Arc::new(Field::new_list_field(
1766 decimals.data_type().clone(),
1767 false,
1768 ))))
1769 .len(7)
1770 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
1771 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
1772 .child_data(vec![decimals.into_data()])
1773 .build()
1774 .unwrap();
1775
1776 let written =
1777 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
1778 .unwrap();
1779
1780 let mut buffer = Vec::with_capacity(1024);
1781 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
1782 writer.write(&written).unwrap();
1783 writer.close().unwrap();
1784
1785 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
1786 .unwrap()
1787 .collect::<Result<Vec<_>, _>>()
1788 .unwrap();
1789
1790 assert_eq!(&written.slice(0, 3), &read[0]);
1791 assert_eq!(&written.slice(3, 3), &read[1]);
1792 assert_eq!(&written.slice(6, 1), &read[2]);
1793 }
1794
1795 #[test]
1796 fn test_read_decimal_file() {
1797 use arrow_array::Decimal128Array;
1798 let testdata = arrow::util::test_util::parquet_test_data();
1799 let file_variants = vec![
1800 ("byte_array", 4),
1801 ("fixed_length", 25),
1802 ("int32", 4),
1803 ("int64", 10),
1804 ];
1805 for (prefix, target_precision) in file_variants {
1806 let path = format!("{testdata}/{prefix}_decimal.parquet");
1807 let file = File::open(path).unwrap();
1808 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1809
1810 let batch = record_reader.next().unwrap().unwrap();
1811 assert_eq!(batch.num_rows(), 24);
1812 let col = batch
1813 .column(0)
1814 .as_any()
1815 .downcast_ref::<Decimal128Array>()
1816 .unwrap();
1817
1818 let expected = 1..25;
1819
1820 assert_eq!(col.precision(), target_precision);
1821 assert_eq!(col.scale(), 2);
1822
1823 for (i, v) in expected.enumerate() {
1824 assert_eq!(col.value(i), v * 100_i128);
1825 }
1826 }
1827 }
1828
1829 #[test]
1830 fn test_read_float16_nonzeros_file() {
1831 use arrow_array::Float16Array;
1832 let testdata = arrow::util::test_util::parquet_test_data();
1833 let path = format!("{testdata}/float16_nonzeros_and_nans.parquet");
1835 let file = File::open(path).unwrap();
1836 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1837
1838 let batch = record_reader.next().unwrap().unwrap();
1839 assert_eq!(batch.num_rows(), 8);
1840 let col = batch
1841 .column(0)
1842 .as_any()
1843 .downcast_ref::<Float16Array>()
1844 .unwrap();
1845
1846 let f16_two = f16::ONE + f16::ONE;
1847
1848 assert_eq!(col.null_count(), 1);
1849 assert!(col.is_null(0));
1850 assert_eq!(col.value(1), f16::ONE);
1851 assert_eq!(col.value(2), -f16_two);
1852 assert!(col.value(3).is_nan());
1853 assert_eq!(col.value(4), f16::ZERO);
1854 assert!(col.value(4).is_sign_positive());
1855 assert_eq!(col.value(5), f16::NEG_ONE);
1856 assert_eq!(col.value(6), f16::NEG_ZERO);
1857 assert!(col.value(6).is_sign_negative());
1858 assert_eq!(col.value(7), f16_two);
1859 }
1860
1861 #[test]
1862 fn test_read_float16_zeros_file() {
1863 use arrow_array::Float16Array;
1864 let testdata = arrow::util::test_util::parquet_test_data();
1865 let path = format!("{testdata}/float16_zeros_and_nans.parquet");
1867 let file = File::open(path).unwrap();
1868 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
1869
1870 let batch = record_reader.next().unwrap().unwrap();
1871 assert_eq!(batch.num_rows(), 3);
1872 let col = batch
1873 .column(0)
1874 .as_any()
1875 .downcast_ref::<Float16Array>()
1876 .unwrap();
1877
1878 assert_eq!(col.null_count(), 1);
1879 assert!(col.is_null(0));
1880 assert_eq!(col.value(1), f16::ZERO);
1881 assert!(col.value(1).is_sign_positive());
1882 assert!(col.value(2).is_nan());
1883 }
1884
1885 #[test]
1886 fn test_read_float32_float64_byte_stream_split() {
1887 let path = format!(
1888 "{}/byte_stream_split.zstd.parquet",
1889 arrow::util::test_util::parquet_test_data(),
1890 );
1891 let file = File::open(path).unwrap();
1892 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
1893
1894 let mut row_count = 0;
1895 for batch in record_reader {
1896 let batch = batch.unwrap();
1897 row_count += batch.num_rows();
1898 let f32_col = batch.column(0).as_primitive::<Float32Type>();
1899 let f64_col = batch.column(1).as_primitive::<Float64Type>();
1900
1901 for &x in f32_col.values() {
1903 assert!(x > -10.0);
1904 assert!(x < 10.0);
1905 }
1906 for &x in f64_col.values() {
1907 assert!(x > -10.0);
1908 assert!(x < 10.0);
1909 }
1910 }
1911 assert_eq!(row_count, 300);
1912 }
1913
1914 #[test]
1915 fn test_read_extended_byte_stream_split() {
1916 let path = format!(
1917 "{}/byte_stream_split_extended.gzip.parquet",
1918 arrow::util::test_util::parquet_test_data(),
1919 );
1920 let file = File::open(path).unwrap();
1921 let record_reader = ParquetRecordBatchReader::try_new(file, 128).unwrap();
1922
1923 let mut row_count = 0;
1924 for batch in record_reader {
1925 let batch = batch.unwrap();
1926 row_count += batch.num_rows();
1927
1928 let f16_col = batch.column(0).as_primitive::<Float16Type>();
1930 let f16_bss = batch.column(1).as_primitive::<Float16Type>();
1931 assert_eq!(f16_col.len(), f16_bss.len());
1932 f16_col
1933 .iter()
1934 .zip(f16_bss.iter())
1935 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1936
1937 let f32_col = batch.column(2).as_primitive::<Float32Type>();
1939 let f32_bss = batch.column(3).as_primitive::<Float32Type>();
1940 assert_eq!(f32_col.len(), f32_bss.len());
1941 f32_col
1942 .iter()
1943 .zip(f32_bss.iter())
1944 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1945
1946 let f64_col = batch.column(4).as_primitive::<Float64Type>();
1948 let f64_bss = batch.column(5).as_primitive::<Float64Type>();
1949 assert_eq!(f64_col.len(), f64_bss.len());
1950 f64_col
1951 .iter()
1952 .zip(f64_bss.iter())
1953 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1954
1955 let i32_col = batch.column(6).as_primitive::<types::Int32Type>();
1957 let i32_bss = batch.column(7).as_primitive::<types::Int32Type>();
1958 assert_eq!(i32_col.len(), i32_bss.len());
1959 i32_col
1960 .iter()
1961 .zip(i32_bss.iter())
1962 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1963
1964 let i64_col = batch.column(8).as_primitive::<types::Int64Type>();
1966 let i64_bss = batch.column(9).as_primitive::<types::Int64Type>();
1967 assert_eq!(i64_col.len(), i64_bss.len());
1968 i64_col
1969 .iter()
1970 .zip(i64_bss.iter())
1971 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1972
1973 let flba_col = batch.column(10).as_fixed_size_binary();
1975 let flba_bss = batch.column(11).as_fixed_size_binary();
1976 assert_eq!(flba_col.len(), flba_bss.len());
1977 flba_col
1978 .iter()
1979 .zip(flba_bss.iter())
1980 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1981
1982 let dec_col = batch.column(12).as_primitive::<Decimal128Type>();
1984 let dec_bss = batch.column(13).as_primitive::<Decimal128Type>();
1985 assert_eq!(dec_col.len(), dec_bss.len());
1986 dec_col
1987 .iter()
1988 .zip(dec_bss.iter())
1989 .for_each(|(l, r)| assert_eq!(l.unwrap(), r.unwrap()));
1990 }
1991 assert_eq!(row_count, 200);
1992 }
1993
1994 #[test]
1995 fn test_read_incorrect_map_schema_file() {
1996 let testdata = arrow::util::test_util::parquet_test_data();
1997 let path = format!("{testdata}/incorrect_map_schema.parquet");
1999 let file = File::open(path).unwrap();
2000 let mut record_reader = ParquetRecordBatchReader::try_new(file, 32).unwrap();
2001
2002 let batch = record_reader.next().unwrap().unwrap();
2003 assert_eq!(batch.num_rows(), 1);
2004
2005 let expected_schema = Schema::new(Fields::from(vec![Field::new(
2006 "my_map",
2007 ArrowDataType::Map(
2008 Arc::new(Field::new(
2009 "key_value",
2010 ArrowDataType::Struct(Fields::from(vec![
2011 Field::new("key", ArrowDataType::Utf8, false),
2012 Field::new("value", ArrowDataType::Utf8, true),
2013 ])),
2014 false,
2015 )),
2016 false,
2017 ),
2018 true,
2019 )]));
2020 assert_eq!(batch.schema().as_ref(), &expected_schema);
2021
2022 assert_eq!(batch.num_rows(), 1);
2023 assert_eq!(batch.column(0).null_count(), 0);
2024 assert_eq!(
2025 batch.column(0).as_map().keys().as_ref(),
2026 &StringArray::from(vec!["parent", "name"])
2027 );
2028 assert_eq!(
2029 batch.column(0).as_map().values().as_ref(),
2030 &StringArray::from(vec!["another", "report"])
2031 );
2032 }
2033
2034 #[derive(Clone)]
2036 struct TestOptions {
2037 num_row_groups: usize,
2040 num_rows: usize,
2042 record_batch_size: usize,
2044 null_percent: Option<usize>,
2046 write_batch_size: usize,
2051 max_data_page_size: usize,
2053 max_dict_page_size: usize,
2055 writer_version: WriterVersion,
2057 enabled_statistics: EnabledStatistics,
2059 encoding: Encoding,
2061 row_selections: Option<(RowSelection, usize)>,
2063 row_filter: Option<Vec<bool>>,
2065 limit: Option<usize>,
2067 offset: Option<usize>,
2069 }
2070
2071 impl std::fmt::Debug for TestOptions {
2073 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2074 f.debug_struct("TestOptions")
2075 .field("num_row_groups", &self.num_row_groups)
2076 .field("num_rows", &self.num_rows)
2077 .field("record_batch_size", &self.record_batch_size)
2078 .field("null_percent", &self.null_percent)
2079 .field("write_batch_size", &self.write_batch_size)
2080 .field("max_data_page_size", &self.max_data_page_size)
2081 .field("max_dict_page_size", &self.max_dict_page_size)
2082 .field("writer_version", &self.writer_version)
2083 .field("enabled_statistics", &self.enabled_statistics)
2084 .field("encoding", &self.encoding)
2085 .field("row_selections", &self.row_selections.is_some())
2086 .field("row_filter", &self.row_filter.is_some())
2087 .field("limit", &self.limit)
2088 .field("offset", &self.offset)
2089 .finish()
2090 }
2091 }
2092
2093 impl Default for TestOptions {
2094 fn default() -> Self {
2095 Self {
2096 num_row_groups: 2,
2097 num_rows: 100,
2098 record_batch_size: 15,
2099 null_percent: None,
2100 write_batch_size: 64,
2101 max_data_page_size: 1024 * 1024,
2102 max_dict_page_size: 1024 * 1024,
2103 writer_version: WriterVersion::PARQUET_1_0,
2104 enabled_statistics: EnabledStatistics::Page,
2105 encoding: Encoding::PLAIN,
2106 row_selections: None,
2107 row_filter: None,
2108 limit: None,
2109 offset: None,
2110 }
2111 }
2112 }
2113
2114 impl TestOptions {
2115 fn new(num_row_groups: usize, num_rows: usize, record_batch_size: usize) -> Self {
2116 Self {
2117 num_row_groups,
2118 num_rows,
2119 record_batch_size,
2120 ..Default::default()
2121 }
2122 }
2123
2124 fn with_null_percent(self, null_percent: usize) -> Self {
2125 Self {
2126 null_percent: Some(null_percent),
2127 ..self
2128 }
2129 }
2130
2131 fn with_max_data_page_size(self, max_data_page_size: usize) -> Self {
2132 Self {
2133 max_data_page_size,
2134 ..self
2135 }
2136 }
2137
2138 fn with_max_dict_page_size(self, max_dict_page_size: usize) -> Self {
2139 Self {
2140 max_dict_page_size,
2141 ..self
2142 }
2143 }
2144
2145 fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
2146 Self {
2147 enabled_statistics,
2148 ..self
2149 }
2150 }
2151
2152 fn with_row_selections(self) -> Self {
2153 assert!(self.row_filter.is_none(), "Must set row selection first");
2154
2155 let mut rng = rng();
2156 let step = rng.random_range(self.record_batch_size..self.num_rows);
2157 let row_selections = create_test_selection(
2158 step,
2159 self.num_row_groups * self.num_rows,
2160 rng.random::<bool>(),
2161 );
2162 Self {
2163 row_selections: Some(row_selections),
2164 ..self
2165 }
2166 }
2167
2168 fn with_row_filter(self) -> Self {
2169 let row_count = match &self.row_selections {
2170 Some((_, count)) => *count,
2171 None => self.num_row_groups * self.num_rows,
2172 };
2173
2174 let mut rng = rng();
2175 Self {
2176 row_filter: Some((0..row_count).map(|_| rng.random_bool(0.9)).collect()),
2177 ..self
2178 }
2179 }
2180
2181 fn with_limit(self, limit: usize) -> Self {
2182 Self {
2183 limit: Some(limit),
2184 ..self
2185 }
2186 }
2187
2188 fn with_offset(self, offset: usize) -> Self {
2189 Self {
2190 offset: Some(offset),
2191 ..self
2192 }
2193 }
2194
2195 fn writer_props(&self) -> WriterProperties {
2196 let builder = WriterProperties::builder()
2197 .set_data_page_size_limit(self.max_data_page_size)
2198 .set_write_batch_size(self.write_batch_size)
2199 .set_writer_version(self.writer_version)
2200 .set_statistics_enabled(self.enabled_statistics);
2201
2202 let builder = match self.encoding {
2203 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
2204 .set_dictionary_enabled(true)
2205 .set_dictionary_page_size_limit(self.max_dict_page_size),
2206 _ => builder
2207 .set_dictionary_enabled(false)
2208 .set_encoding(self.encoding),
2209 };
2210
2211 builder.build()
2212 }
2213 }
2214
2215 fn run_single_column_reader_tests<T, F, G>(
2222 rand_max: i32,
2223 converted_type: ConvertedType,
2224 arrow_type: Option<ArrowDataType>,
2225 converter: F,
2226 encodings: &[Encoding],
2227 ) where
2228 T: DataType,
2229 G: RandGen<T>,
2230 F: Fn(&[Option<T::T>]) -> ArrayRef,
2231 {
2232 let all_options = vec![
2233 TestOptions::new(2, 100, 15),
2236 TestOptions::new(3, 25, 5),
2241 TestOptions::new(4, 100, 25),
2245 TestOptions::new(3, 256, 73).with_max_data_page_size(128),
2247 TestOptions::new(3, 256, 57).with_max_dict_page_size(128),
2249 TestOptions::new(2, 256, 127).with_null_percent(0),
2251 TestOptions::new(2, 256, 93).with_null_percent(25),
2253 TestOptions::new(4, 100, 25).with_limit(0),
2255 TestOptions::new(4, 100, 25).with_limit(50),
2257 TestOptions::new(4, 100, 25).with_limit(10),
2259 TestOptions::new(4, 100, 25).with_limit(101),
2261 TestOptions::new(4, 100, 25).with_offset(30).with_limit(20),
2263 TestOptions::new(4, 100, 25).with_offset(20).with_limit(80),
2265 TestOptions::new(4, 100, 25).with_offset(20).with_limit(81),
2267 TestOptions::new(2, 256, 91)
2269 .with_null_percent(25)
2270 .with_enabled_statistics(EnabledStatistics::Chunk),
2271 TestOptions::new(2, 256, 91)
2273 .with_null_percent(25)
2274 .with_enabled_statistics(EnabledStatistics::None),
2275 TestOptions::new(2, 128, 91)
2277 .with_null_percent(100)
2278 .with_enabled_statistics(EnabledStatistics::None),
2279 TestOptions::new(2, 100, 15).with_row_selections(),
2284 TestOptions::new(3, 25, 5).with_row_selections(),
2289 TestOptions::new(4, 100, 25).with_row_selections(),
2293 TestOptions::new(3, 256, 73)
2295 .with_max_data_page_size(128)
2296 .with_row_selections(),
2297 TestOptions::new(3, 256, 57)
2299 .with_max_dict_page_size(128)
2300 .with_row_selections(),
2301 TestOptions::new(2, 256, 127)
2303 .with_null_percent(0)
2304 .with_row_selections(),
2305 TestOptions::new(2, 256, 93)
2307 .with_null_percent(25)
2308 .with_row_selections(),
2309 TestOptions::new(2, 256, 93)
2311 .with_null_percent(25)
2312 .with_row_selections()
2313 .with_limit(10),
2314 TestOptions::new(2, 256, 93)
2316 .with_null_percent(25)
2317 .with_row_selections()
2318 .with_offset(20)
2319 .with_limit(10),
2320 TestOptions::new(4, 100, 25).with_row_filter(),
2324 TestOptions::new(4, 100, 25)
2326 .with_row_selections()
2327 .with_row_filter(),
2328 TestOptions::new(2, 256, 93)
2330 .with_null_percent(25)
2331 .with_max_data_page_size(10)
2332 .with_row_filter(),
2333 TestOptions::new(2, 256, 93)
2335 .with_null_percent(25)
2336 .with_max_data_page_size(10)
2337 .with_row_selections()
2338 .with_row_filter(),
2339 TestOptions::new(2, 256, 93)
2341 .with_enabled_statistics(EnabledStatistics::None)
2342 .with_max_data_page_size(10)
2343 .with_row_selections(),
2344 ];
2345
2346 all_options.into_iter().for_each(|opts| {
2347 for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
2348 for encoding in encodings {
2349 let opts = TestOptions {
2350 writer_version,
2351 encoding: *encoding,
2352 ..opts.clone()
2353 };
2354
2355 single_column_reader_test::<T, _, G>(
2356 opts,
2357 rand_max,
2358 converted_type,
2359 arrow_type.clone(),
2360 &converter,
2361 )
2362 }
2363 }
2364 });
2365 }
2366
2367 fn single_column_reader_test<T, F, G>(
2371 opts: TestOptions,
2372 rand_max: i32,
2373 converted_type: ConvertedType,
2374 arrow_type: Option<ArrowDataType>,
2375 converter: F,
2376 ) where
2377 T: DataType,
2378 G: RandGen<T>,
2379 F: Fn(&[Option<T::T>]) -> ArrayRef,
2380 {
2381 println!(
2383 "Running type {:?} single_column_reader_test ConvertedType::{}/ArrowType::{:?} with Options: {:?}",
2384 T::get_physical_type(), converted_type, arrow_type, opts
2385 );
2386
2387 let (repetition, def_levels) = match opts.null_percent.as_ref() {
2389 Some(null_percent) => {
2390 let mut rng = rng();
2391
2392 let def_levels: Vec<Vec<i16>> = (0..opts.num_row_groups)
2393 .map(|_| {
2394 std::iter::from_fn(|| {
2395 Some((rng.next_u32() as usize % 100 >= *null_percent) as i16)
2396 })
2397 .take(opts.num_rows)
2398 .collect()
2399 })
2400 .collect();
2401 (Repetition::OPTIONAL, Some(def_levels))
2402 }
2403 None => (Repetition::REQUIRED, None),
2404 };
2405
2406 let values: Vec<Vec<T::T>> = (0..opts.num_row_groups)
2408 .map(|idx| {
2409 let null_count = match def_levels.as_ref() {
2410 Some(d) => d[idx].iter().filter(|x| **x == 0).count(),
2411 None => 0,
2412 };
2413 G::gen_vec(rand_max, opts.num_rows - null_count)
2414 })
2415 .collect();
2416
2417 let len = match T::get_physical_type() {
2418 crate::basic::Type::FIXED_LEN_BYTE_ARRAY => rand_max,
2419 crate::basic::Type::INT96 => 12,
2420 _ => -1,
2421 };
2422
2423 let fields = vec![Arc::new(
2424 Type::primitive_type_builder("leaf", T::get_physical_type())
2425 .with_repetition(repetition)
2426 .with_converted_type(converted_type)
2427 .with_length(len)
2428 .build()
2429 .unwrap(),
2430 )];
2431
2432 let schema = Arc::new(
2433 Type::group_type_builder("test_schema")
2434 .with_fields(fields)
2435 .build()
2436 .unwrap(),
2437 );
2438
2439 let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false));
2440
2441 let mut file = tempfile::tempfile().unwrap();
2442
2443 generate_single_column_file_with_data::<T>(
2444 &values,
2445 def_levels.as_ref(),
2446 file.try_clone().unwrap(), schema,
2448 arrow_field,
2449 &opts,
2450 )
2451 .unwrap();
2452
2453 file.rewind().unwrap();
2454
2455 let options = ArrowReaderOptions::new()
2456 .with_page_index(opts.enabled_statistics == EnabledStatistics::Page);
2457
2458 let mut builder =
2459 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
2460
2461 let expected_data = match opts.row_selections {
2462 Some((selections, row_count)) => {
2463 let mut without_skip_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2464
2465 let mut skip_data: Vec<Option<T::T>> = vec![];
2466 let dequeue: VecDeque<RowSelector> = selections.clone().into();
2467 for select in dequeue {
2468 if select.skip {
2469 without_skip_data.drain(0..select.row_count);
2470 } else {
2471 skip_data.extend(without_skip_data.drain(0..select.row_count));
2472 }
2473 }
2474 builder = builder.with_row_selection(selections);
2475
2476 assert_eq!(skip_data.len(), row_count);
2477 skip_data
2478 }
2479 None => {
2480 let expected_data = gen_expected_data::<T>(def_levels.as_ref(), &values);
2482 assert_eq!(expected_data.len(), opts.num_rows * opts.num_row_groups);
2483 expected_data
2484 }
2485 };
2486
2487 let mut expected_data = match opts.row_filter {
2488 Some(filter) => {
2489 let expected_data = expected_data
2490 .into_iter()
2491 .zip(filter.iter())
2492 .filter_map(|(d, f)| f.then(|| d))
2493 .collect();
2494
2495 let mut filter_offset = 0;
2496 let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
2497 ProjectionMask::all(),
2498 move |b| {
2499 let array = BooleanArray::from_iter(
2500 filter
2501 .iter()
2502 .skip(filter_offset)
2503 .take(b.num_rows())
2504 .map(|x| Some(*x)),
2505 );
2506 filter_offset += b.num_rows();
2507 Ok(array)
2508 },
2509 ))]);
2510
2511 builder = builder.with_row_filter(filter);
2512 expected_data
2513 }
2514 None => expected_data,
2515 };
2516
2517 if let Some(offset) = opts.offset {
2518 builder = builder.with_offset(offset);
2519 expected_data = expected_data.into_iter().skip(offset).collect();
2520 }
2521
2522 if let Some(limit) = opts.limit {
2523 builder = builder.with_limit(limit);
2524 expected_data = expected_data.into_iter().take(limit).collect();
2525 }
2526
2527 let mut record_reader = builder
2528 .with_batch_size(opts.record_batch_size)
2529 .build()
2530 .unwrap();
2531
2532 let mut total_read = 0;
2533 loop {
2534 let maybe_batch = record_reader.next();
2535 if total_read < expected_data.len() {
2536 let end = min(total_read + opts.record_batch_size, expected_data.len());
2537 let batch = maybe_batch.unwrap().unwrap();
2538 assert_eq!(end - total_read, batch.num_rows());
2539
2540 let a = converter(&expected_data[total_read..end]);
2541 let b = Arc::clone(batch.column(0));
2542
2543 assert_eq!(a.data_type(), b.data_type());
2544 assert_eq!(a.to_data(), b.to_data());
2545 assert_eq!(
2546 a.as_any().type_id(),
2547 b.as_any().type_id(),
2548 "incorrect type ids"
2549 );
2550
2551 total_read = end;
2552 } else {
2553 assert!(maybe_batch.is_none());
2554 break;
2555 }
2556 }
2557 }
2558
2559 fn gen_expected_data<T: DataType>(
2560 def_levels: Option<&Vec<Vec<i16>>>,
2561 values: &[Vec<T::T>],
2562 ) -> Vec<Option<T::T>> {
2563 let data: Vec<Option<T::T>> = match def_levels {
2564 Some(levels) => {
2565 let mut values_iter = values.iter().flatten();
2566 levels
2567 .iter()
2568 .flatten()
2569 .map(|d| match d {
2570 1 => Some(values_iter.next().cloned().unwrap()),
2571 0 => None,
2572 _ => unreachable!(),
2573 })
2574 .collect()
2575 }
2576 None => values.iter().flatten().map(|b| Some(b.clone())).collect(),
2577 };
2578 data
2579 }
2580
2581 fn generate_single_column_file_with_data<T: DataType>(
2582 values: &[Vec<T::T>],
2583 def_levels: Option<&Vec<Vec<i16>>>,
2584 file: File,
2585 schema: TypePtr,
2586 field: Option<Field>,
2587 opts: &TestOptions,
2588 ) -> Result<crate::format::FileMetaData> {
2589 let mut writer_props = opts.writer_props();
2590 if let Some(field) = field {
2591 let arrow_schema = Schema::new(vec![field]);
2592 add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props);
2593 }
2594
2595 let mut writer = SerializedFileWriter::new(file, schema, Arc::new(writer_props))?;
2596
2597 for (idx, v) in values.iter().enumerate() {
2598 let def_levels = def_levels.map(|d| d[idx].as_slice());
2599 let mut row_group_writer = writer.next_row_group()?;
2600 {
2601 let mut column_writer = row_group_writer
2602 .next_column()?
2603 .expect("Column writer is none!");
2604
2605 column_writer
2606 .typed::<T>()
2607 .write_batch(v, def_levels, None)?;
2608
2609 column_writer.close()?;
2610 }
2611 row_group_writer.close()?;
2612 }
2613
2614 writer.close()
2615 }
2616
2617 fn get_test_file(file_name: &str) -> File {
2618 let mut path = PathBuf::new();
2619 path.push(arrow::util::test_util::arrow_test_data());
2620 path.push(file_name);
2621
2622 File::open(path.as_path()).expect("File not found!")
2623 }
2624
2625 #[test]
2626 fn test_read_structs() {
2627 let testdata = arrow::util::test_util::parquet_test_data();
2631 let path = format!("{testdata}/nested_structs.rust.parquet");
2632 let file = File::open(&path).unwrap();
2633 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2634
2635 for batch in record_batch_reader {
2636 batch.unwrap();
2637 }
2638
2639 let file = File::open(&path).unwrap();
2640 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2641
2642 let mask = ProjectionMask::leaves(builder.parquet_schema(), [3, 8, 10]);
2643 let projected_reader = builder
2644 .with_projection(mask)
2645 .with_batch_size(60)
2646 .build()
2647 .unwrap();
2648
2649 let expected_schema = Schema::new(vec![
2650 Field::new(
2651 "roll_num",
2652 ArrowDataType::Struct(Fields::from(vec![Field::new(
2653 "count",
2654 ArrowDataType::UInt64,
2655 false,
2656 )])),
2657 false,
2658 ),
2659 Field::new(
2660 "PC_CUR",
2661 ArrowDataType::Struct(Fields::from(vec![
2662 Field::new("mean", ArrowDataType::Int64, false),
2663 Field::new("sum", ArrowDataType::Int64, false),
2664 ])),
2665 false,
2666 ),
2667 ]);
2668
2669 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2671
2672 for batch in projected_reader {
2673 let batch = batch.unwrap();
2674 assert_eq!(batch.schema().as_ref(), &expected_schema);
2675 }
2676 }
2677
2678 #[test]
2679 fn test_read_structs_by_name() {
2681 let testdata = arrow::util::test_util::parquet_test_data();
2682 let path = format!("{testdata}/nested_structs.rust.parquet");
2683 let file = File::open(&path).unwrap();
2684 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2685
2686 for batch in record_batch_reader {
2687 batch.unwrap();
2688 }
2689
2690 let file = File::open(&path).unwrap();
2691 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2692
2693 let mask = ProjectionMask::columns(
2694 builder.parquet_schema(),
2695 ["roll_num.count", "PC_CUR.mean", "PC_CUR.sum"],
2696 );
2697 let projected_reader = builder
2698 .with_projection(mask)
2699 .with_batch_size(60)
2700 .build()
2701 .unwrap();
2702
2703 let expected_schema = Schema::new(vec![
2704 Field::new(
2705 "roll_num",
2706 ArrowDataType::Struct(Fields::from(vec![Field::new(
2707 "count",
2708 ArrowDataType::UInt64,
2709 false,
2710 )])),
2711 false,
2712 ),
2713 Field::new(
2714 "PC_CUR",
2715 ArrowDataType::Struct(Fields::from(vec![
2716 Field::new("mean", ArrowDataType::Int64, false),
2717 Field::new("sum", ArrowDataType::Int64, false),
2718 ])),
2719 false,
2720 ),
2721 ]);
2722
2723 assert_eq!(&expected_schema, projected_reader.schema().as_ref());
2724
2725 for batch in projected_reader {
2726 let batch = batch.unwrap();
2727 assert_eq!(batch.schema().as_ref(), &expected_schema);
2728 }
2729 }
2730
2731 #[test]
2732 fn test_read_maps() {
2733 let testdata = arrow::util::test_util::parquet_test_data();
2734 let path = format!("{testdata}/nested_maps.snappy.parquet");
2735 let file = File::open(path).unwrap();
2736 let record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
2737
2738 for batch in record_batch_reader {
2739 batch.unwrap();
2740 }
2741 }
2742
2743 #[test]
2744 fn test_nested_nullability() {
2745 let message_type = "message nested {
2746 OPTIONAL Group group {
2747 REQUIRED INT32 leaf;
2748 }
2749 }";
2750
2751 let file = tempfile::tempfile().unwrap();
2752 let schema = Arc::new(parse_message_type(message_type).unwrap());
2753
2754 {
2755 let mut writer =
2757 SerializedFileWriter::new(file.try_clone().unwrap(), schema, Default::default())
2758 .unwrap();
2759
2760 {
2761 let mut row_group_writer = writer.next_row_group().unwrap();
2762 let mut column_writer = row_group_writer.next_column().unwrap().unwrap();
2763
2764 column_writer
2765 .typed::<Int32Type>()
2766 .write_batch(&[34, 76], Some(&[0, 1, 0, 1]), None)
2767 .unwrap();
2768
2769 column_writer.close().unwrap();
2770 row_group_writer.close().unwrap();
2771 }
2772
2773 writer.close().unwrap();
2774 }
2775
2776 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
2777 let mask = ProjectionMask::leaves(builder.parquet_schema(), [0]);
2778
2779 let reader = builder.with_projection(mask).build().unwrap();
2780
2781 let expected_schema = Schema::new(Fields::from(vec![Field::new(
2782 "group",
2783 ArrowDataType::Struct(vec![Field::new("leaf", ArrowDataType::Int32, false)].into()),
2784 true,
2785 )]));
2786
2787 let batch = reader.into_iter().next().unwrap().unwrap();
2788 assert_eq!(batch.schema().as_ref(), &expected_schema);
2789 assert_eq!(batch.num_rows(), 4);
2790 assert_eq!(batch.column(0).null_count(), 2);
2791 }
2792
2793 #[test]
2794 fn test_invalid_utf8() {
2795 let data = vec![
2797 80, 65, 82, 49, 21, 6, 21, 22, 21, 22, 92, 21, 2, 21, 0, 21, 2, 21, 0, 21, 4, 21, 0,
2798 18, 28, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0,
2799 3, 1, 5, 0, 0, 0, 104, 101, 255, 108, 111, 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24,
2800 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102, 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108,
2801 111, 24, 5, 104, 101, 255, 108, 111, 0, 0, 0, 21, 4, 25, 44, 72, 4, 114, 111, 111, 116,
2802 21, 2, 0, 21, 12, 37, 2, 24, 2, 99, 49, 37, 0, 76, 28, 0, 0, 0, 22, 2, 25, 28, 25, 28,
2803 38, 110, 28, 21, 12, 25, 37, 6, 0, 25, 24, 2, 99, 49, 21, 0, 22, 2, 22, 102, 22, 102,
2804 38, 8, 60, 54, 0, 40, 5, 104, 101, 255, 108, 111, 24, 5, 104, 101, 255, 108, 111, 0, 0,
2805 0, 22, 102, 22, 2, 0, 40, 44, 65, 114, 114, 111, 119, 50, 32, 45, 32, 78, 97, 116, 105,
2806 118, 101, 32, 82, 117, 115, 116, 32, 105, 109, 112, 108, 101, 109, 101, 110, 116, 97,
2807 116, 105, 111, 110, 32, 111, 102, 32, 65, 114, 114, 111, 119, 0, 130, 0, 0, 0, 80, 65,
2808 82, 49,
2809 ];
2810
2811 let file = Bytes::from(data);
2812 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 10).unwrap();
2813
2814 let error = record_batch_reader.next().unwrap().unwrap_err();
2815
2816 assert!(
2817 error.to_string().contains("invalid utf-8 sequence"),
2818 "{}",
2819 error
2820 );
2821 }
2822
2823 #[test]
2824 fn test_invalid_utf8_string_array() {
2825 test_invalid_utf8_string_array_inner::<i32>();
2826 }
2827
2828 #[test]
2829 fn test_invalid_utf8_large_string_array() {
2830 test_invalid_utf8_string_array_inner::<i64>();
2831 }
2832
2833 fn test_invalid_utf8_string_array_inner<O: OffsetSizeTrait>() {
2834 let cases = [
2835 invalid_utf8_first_char::<O>(),
2836 invalid_utf8_first_char_long_strings::<O>(),
2837 invalid_utf8_later_char::<O>(),
2838 invalid_utf8_later_char_long_strings::<O>(),
2839 invalid_utf8_later_char_really_long_strings::<O>(),
2840 invalid_utf8_later_char_really_long_strings2::<O>(),
2841 ];
2842 for array in &cases {
2843 for encoding in STRING_ENCODINGS {
2844 let array = unsafe {
2847 GenericStringArray::<O>::new_unchecked(
2848 array.offsets().clone(),
2849 array.values().clone(),
2850 array.nulls().cloned(),
2851 )
2852 };
2853 let data_type = array.data_type().clone();
2854 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
2855 let err = read_from_parquet(data).unwrap_err();
2856 let expected_err =
2857 "Parquet argument error: Parquet error: encountered non UTF-8 data";
2858 assert!(
2859 err.to_string().contains(expected_err),
2860 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
2861 );
2862 }
2863 }
2864 }
2865
2866 #[test]
2867 fn test_invalid_utf8_string_view_array() {
2868 let cases = [
2869 invalid_utf8_first_char::<i32>(),
2870 invalid_utf8_first_char_long_strings::<i32>(),
2871 invalid_utf8_later_char::<i32>(),
2872 invalid_utf8_later_char_long_strings::<i32>(),
2873 invalid_utf8_later_char_really_long_strings::<i32>(),
2874 invalid_utf8_later_char_really_long_strings2::<i32>(),
2875 ];
2876
2877 for encoding in STRING_ENCODINGS {
2878 for array in &cases {
2879 let array = arrow_cast::cast(&array, &ArrowDataType::BinaryView).unwrap();
2880 let array = array.as_binary_view();
2881
2882 let array = unsafe {
2885 StringViewArray::new_unchecked(
2886 array.views().clone(),
2887 array.data_buffers().to_vec(),
2888 array.nulls().cloned(),
2889 )
2890 };
2891
2892 let data_type = array.data_type().clone();
2893 let data = write_to_parquet_with_encoding(Arc::new(array), *encoding);
2894 let err = read_from_parquet(data).unwrap_err();
2895 let expected_err =
2896 "Parquet argument error: Parquet error: encountered non UTF-8 data";
2897 assert!(
2898 err.to_string().contains(expected_err),
2899 "data type: {data_type:?}, expected: {expected_err}, got: {err}"
2900 );
2901 }
2902 }
2903 }
2904
2905 const STRING_ENCODINGS: &[Option<Encoding>] = &[
2907 None,
2908 Some(Encoding::PLAIN),
2909 Some(Encoding::DELTA_LENGTH_BYTE_ARRAY),
2910 Some(Encoding::DELTA_BYTE_ARRAY),
2911 ];
2912
2913 const INVALID_UTF8_FIRST_CHAR: &[u8] = &[0xa0, 0xa1, 0x20, 0x20];
2916
2917 const INVALID_UTF8_LATER_CHAR: &[u8] = &[0x20, 0x20, 0x20, 0xa0, 0xa1, 0x20, 0x20];
2920
2921 fn invalid_utf8_first_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2923 let valid: &[u8] = b" ";
2924 let invalid = INVALID_UTF8_FIRST_CHAR;
2925 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
2926 }
2927
2928 fn invalid_utf8_first_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2932 let valid: &[u8] = b" ";
2933 let mut invalid = vec![];
2934 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2935 invalid.extend_from_slice(INVALID_UTF8_FIRST_CHAR);
2936 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2937 }
2938
2939 fn invalid_utf8_later_char<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2942 let valid: &[u8] = b" ";
2943 let invalid: &[u8] = INVALID_UTF8_LATER_CHAR;
2944 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(invalid)])
2945 }
2946
2947 fn invalid_utf8_later_char_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2951 let valid: &[u8] = b" ";
2952 let mut invalid = vec![];
2953 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2954 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
2955 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2956 }
2957
2958 fn invalid_utf8_later_char_really_long_strings<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2962 let valid: &[u8] = b" ";
2963 let mut invalid = vec![];
2964 for _ in 0..10 {
2965 invalid.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2967 }
2968 invalid.extend_from_slice(INVALID_UTF8_LATER_CHAR);
2969 GenericBinaryArray::<O>::from_iter(vec![None, Some(valid), None, Some(&invalid)])
2970 }
2971
2972 fn invalid_utf8_later_char_really_long_strings2<O: OffsetSizeTrait>() -> GenericBinaryArray<O> {
2975 let valid: &[u8] = b" ";
2976 let mut valid_long = vec![];
2977 for _ in 0..10 {
2978 valid_long.extend_from_slice(b"ThisStringIsCertainlyLongerThan12Bytes");
2980 }
2981 let invalid = INVALID_UTF8_LATER_CHAR;
2982 GenericBinaryArray::<O>::from_iter(vec![
2983 None,
2984 Some(valid),
2985 Some(invalid),
2986 None,
2987 Some(&valid_long),
2988 Some(valid),
2989 ])
2990 }
2991
2992 fn write_to_parquet_with_encoding(array: ArrayRef, encoding: Option<Encoding>) -> Vec<u8> {
2997 let batch = RecordBatch::try_from_iter(vec![("c", array)]).unwrap();
2998 let mut data = vec![];
2999 let schema = batch.schema();
3000 let props = encoding.map(|encoding| {
3001 WriterProperties::builder()
3002 .set_dictionary_enabled(false)
3004 .set_encoding(encoding)
3005 .build()
3006 });
3007
3008 {
3009 let mut writer = ArrowWriter::try_new(&mut data, schema, props).unwrap();
3010 writer.write(&batch).unwrap();
3011 writer.flush().unwrap();
3012 writer.close().unwrap();
3013 };
3014 data
3015 }
3016
3017 fn read_from_parquet(data: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
3019 let reader = ArrowReaderBuilder::try_new(bytes::Bytes::from(data))
3020 .unwrap()
3021 .build()
3022 .unwrap();
3023
3024 reader.collect()
3025 }
3026
3027 #[test]
3028 fn test_dictionary_preservation() {
3029 let fields = vec![Arc::new(
3030 Type::primitive_type_builder("leaf", PhysicalType::BYTE_ARRAY)
3031 .with_repetition(Repetition::OPTIONAL)
3032 .with_converted_type(ConvertedType::UTF8)
3033 .build()
3034 .unwrap(),
3035 )];
3036
3037 let schema = Arc::new(
3038 Type::group_type_builder("test_schema")
3039 .with_fields(fields)
3040 .build()
3041 .unwrap(),
3042 );
3043
3044 let dict_type = ArrowDataType::Dictionary(
3045 Box::new(ArrowDataType::Int32),
3046 Box::new(ArrowDataType::Utf8),
3047 );
3048
3049 let arrow_field = Field::new("leaf", dict_type, true);
3050
3051 let mut file = tempfile::tempfile().unwrap();
3052
3053 let values = vec![
3054 vec![
3055 ByteArray::from("hello"),
3056 ByteArray::from("a"),
3057 ByteArray::from("b"),
3058 ByteArray::from("d"),
3059 ],
3060 vec![
3061 ByteArray::from("c"),
3062 ByteArray::from("a"),
3063 ByteArray::from("b"),
3064 ],
3065 ];
3066
3067 let def_levels = vec![
3068 vec![1, 0, 0, 1, 0, 0, 1, 1],
3069 vec![0, 0, 1, 1, 0, 0, 1, 0, 0],
3070 ];
3071
3072 let opts = TestOptions {
3073 encoding: Encoding::RLE_DICTIONARY,
3074 ..Default::default()
3075 };
3076
3077 generate_single_column_file_with_data::<ByteArrayType>(
3078 &values,
3079 Some(&def_levels),
3080 file.try_clone().unwrap(), schema,
3082 Some(arrow_field),
3083 &opts,
3084 )
3085 .unwrap();
3086
3087 file.rewind().unwrap();
3088
3089 let record_reader = ParquetRecordBatchReader::try_new(file, 3).unwrap();
3090
3091 let batches = record_reader
3092 .collect::<Result<Vec<RecordBatch>, _>>()
3093 .unwrap();
3094
3095 assert_eq!(batches.len(), 6);
3096 assert!(batches.iter().all(|x| x.num_columns() == 1));
3097
3098 let row_counts = batches
3099 .iter()
3100 .map(|x| (x.num_rows(), x.column(0).null_count()))
3101 .collect::<Vec<_>>();
3102
3103 assert_eq!(
3104 row_counts,
3105 vec![(3, 2), (3, 2), (3, 1), (3, 1), (3, 2), (2, 2)]
3106 );
3107
3108 let get_dict = |batch: &RecordBatch| batch.column(0).to_data().child_data()[0].clone();
3109
3110 assert_eq!(get_dict(&batches[0]), get_dict(&batches[1]));
3112 assert_ne!(get_dict(&batches[1]), get_dict(&batches[2]));
3114 assert_ne!(get_dict(&batches[2]), get_dict(&batches[3]));
3115 assert_eq!(get_dict(&batches[3]), get_dict(&batches[4]));
3117 assert_eq!(get_dict(&batches[4]), get_dict(&batches[5]));
3118 }
3119
3120 #[test]
3121 fn test_read_null_list() {
3122 let testdata = arrow::util::test_util::parquet_test_data();
3123 let path = format!("{testdata}/null_list.parquet");
3124 let file = File::open(path).unwrap();
3125 let mut record_batch_reader = ParquetRecordBatchReader::try_new(file, 60).unwrap();
3126
3127 let batch = record_batch_reader.next().unwrap().unwrap();
3128 assert_eq!(batch.num_rows(), 1);
3129 assert_eq!(batch.num_columns(), 1);
3130 assert_eq!(batch.column(0).len(), 1);
3131
3132 let list = batch
3133 .column(0)
3134 .as_any()
3135 .downcast_ref::<ListArray>()
3136 .unwrap();
3137 assert_eq!(list.len(), 1);
3138 assert!(list.is_valid(0));
3139
3140 let val = list.value(0);
3141 assert_eq!(val.len(), 0);
3142 }
3143
3144 #[test]
3145 fn test_null_schema_inference() {
3146 let testdata = arrow::util::test_util::parquet_test_data();
3147 let path = format!("{testdata}/null_list.parquet");
3148 let file = File::open(path).unwrap();
3149
3150 let arrow_field = Field::new(
3151 "emptylist",
3152 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Null, true))),
3153 true,
3154 );
3155
3156 let options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3157 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3158 let schema = builder.schema();
3159 assert_eq!(schema.fields().len(), 1);
3160 assert_eq!(schema.field(0), &arrow_field);
3161 }
3162
3163 #[test]
3164 fn test_skip_metadata() {
3165 let col = Arc::new(TimestampNanosecondArray::from_iter_values(vec![0, 1, 2]));
3166 let field = Field::new("col", col.data_type().clone(), true);
3167
3168 let schema_without_metadata = Arc::new(Schema::new(vec![field.clone()]));
3169
3170 let metadata = [("key".to_string(), "value".to_string())]
3171 .into_iter()
3172 .collect();
3173
3174 let schema_with_metadata = Arc::new(Schema::new(vec![field.with_metadata(metadata)]));
3175
3176 assert_ne!(schema_with_metadata, schema_without_metadata);
3177
3178 let batch =
3179 RecordBatch::try_new(schema_with_metadata.clone(), vec![col as ArrayRef]).unwrap();
3180
3181 let file = |version: WriterVersion| {
3182 let props = WriterProperties::builder()
3183 .set_writer_version(version)
3184 .build();
3185
3186 let file = tempfile().unwrap();
3187 let mut writer =
3188 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(), Some(props))
3189 .unwrap();
3190 writer.write(&batch).unwrap();
3191 writer.close().unwrap();
3192 file
3193 };
3194
3195 let skip_options = ArrowReaderOptions::new().with_skip_arrow_metadata(true);
3196
3197 let v1_reader = file(WriterVersion::PARQUET_1_0);
3198 let v2_reader = file(WriterVersion::PARQUET_2_0);
3199
3200 let arrow_reader =
3201 ParquetRecordBatchReader::try_new(v1_reader.try_clone().unwrap(), 1024).unwrap();
3202 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3203
3204 let reader =
3205 ParquetRecordBatchReaderBuilder::try_new_with_options(v1_reader, skip_options.clone())
3206 .unwrap()
3207 .build()
3208 .unwrap();
3209 assert_eq!(reader.schema(), schema_without_metadata);
3210
3211 let arrow_reader =
3212 ParquetRecordBatchReader::try_new(v2_reader.try_clone().unwrap(), 1024).unwrap();
3213 assert_eq!(arrow_reader.schema(), schema_with_metadata);
3214
3215 let reader = ParquetRecordBatchReaderBuilder::try_new_with_options(v2_reader, skip_options)
3216 .unwrap()
3217 .build()
3218 .unwrap();
3219 assert_eq!(reader.schema(), schema_without_metadata);
3220 }
3221
3222 fn write_parquet_from_iter<I, F>(value: I) -> File
3223 where
3224 I: IntoIterator<Item = (F, ArrayRef)>,
3225 F: AsRef<str>,
3226 {
3227 let batch = RecordBatch::try_from_iter(value).unwrap();
3228 let file = tempfile().unwrap();
3229 let mut writer =
3230 ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema().clone(), None).unwrap();
3231 writer.write(&batch).unwrap();
3232 writer.close().unwrap();
3233 file
3234 }
3235
3236 fn run_schema_test_with_error<I, F>(value: I, schema: SchemaRef, expected_error: &str)
3237 where
3238 I: IntoIterator<Item = (F, ArrayRef)>,
3239 F: AsRef<str>,
3240 {
3241 let file = write_parquet_from_iter(value);
3242 let options_with_schema = ArrowReaderOptions::new().with_schema(schema.clone());
3243 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3244 file.try_clone().unwrap(),
3245 options_with_schema,
3246 );
3247 assert_eq!(builder.err().unwrap().to_string(), expected_error);
3248 }
3249
3250 #[test]
3251 fn test_schema_too_few_columns() {
3252 run_schema_test_with_error(
3253 vec![
3254 ("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3255 ("int32", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3256 ],
3257 Arc::new(Schema::new(vec![Field::new(
3258 "int64",
3259 ArrowDataType::Int64,
3260 false,
3261 )])),
3262 "Arrow: incompatible arrow schema, expected 2 struct fields got 1",
3263 );
3264 }
3265
3266 #[test]
3267 fn test_schema_too_many_columns() {
3268 run_schema_test_with_error(
3269 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3270 Arc::new(Schema::new(vec![
3271 Field::new("int64", ArrowDataType::Int64, false),
3272 Field::new("int32", ArrowDataType::Int32, false),
3273 ])),
3274 "Arrow: incompatible arrow schema, expected 1 struct fields got 2",
3275 );
3276 }
3277
3278 #[test]
3279 fn test_schema_mismatched_column_names() {
3280 run_schema_test_with_error(
3281 vec![("int64", Arc::new(Int64Array::from(vec![0])) as ArrayRef)],
3282 Arc::new(Schema::new(vec![Field::new(
3283 "other",
3284 ArrowDataType::Int64,
3285 false,
3286 )])),
3287 "Arrow: incompatible arrow schema, expected field named int64 got other",
3288 );
3289 }
3290
3291 #[test]
3292 fn test_schema_incompatible_columns() {
3293 run_schema_test_with_error(
3294 vec![
3295 (
3296 "col1_invalid",
3297 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3298 ),
3299 (
3300 "col2_valid",
3301 Arc::new(Int32Array::from(vec![0])) as ArrayRef,
3302 ),
3303 (
3304 "col3_invalid",
3305 Arc::new(Date64Array::from(vec![0])) as ArrayRef,
3306 ),
3307 ],
3308 Arc::new(Schema::new(vec![
3309 Field::new("col1_invalid", ArrowDataType::Int32, false),
3310 Field::new("col2_valid", ArrowDataType::Int32, false),
3311 Field::new("col3_invalid", ArrowDataType::Int32, false),
3312 ])),
3313 "Arrow: incompatible arrow schema, the following fields could not be cast: [col1_invalid, col3_invalid]",
3314 );
3315 }
3316
3317 #[test]
3318 fn test_one_incompatible_nested_column() {
3319 let nested_fields = Fields::from(vec![
3320 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3321 Field::new("nested1_invalid", ArrowDataType::Int64, false),
3322 ]);
3323 let nested = StructArray::try_new(
3324 nested_fields,
3325 vec![
3326 Arc::new(StringArray::from(vec!["a"])) as ArrayRef,
3327 Arc::new(Int64Array::from(vec![0])) as ArrayRef,
3328 ],
3329 None,
3330 )
3331 .expect("struct array");
3332 let supplied_nested_fields = Fields::from(vec![
3333 Field::new("nested1_valid", ArrowDataType::Utf8, false),
3334 Field::new("nested1_invalid", ArrowDataType::Int32, false),
3335 ]);
3336 run_schema_test_with_error(
3337 vec![
3338 ("col1", Arc::new(Int64Array::from(vec![0])) as ArrayRef),
3339 ("col2", Arc::new(Int32Array::from(vec![0])) as ArrayRef),
3340 ("nested", Arc::new(nested) as ArrayRef),
3341 ],
3342 Arc::new(Schema::new(vec![
3343 Field::new("col1", ArrowDataType::Int64, false),
3344 Field::new("col2", ArrowDataType::Int32, false),
3345 Field::new(
3346 "nested",
3347 ArrowDataType::Struct(supplied_nested_fields),
3348 false,
3349 ),
3350 ])),
3351 "Arrow: incompatible arrow schema, the following fields could not be cast: [nested]",
3352 );
3353 }
3354
3355 #[test]
3356 fn test_read_binary_as_utf8() {
3357 let file = write_parquet_from_iter(vec![
3358 (
3359 "binary_to_utf8",
3360 Arc::new(BinaryArray::from(vec![
3361 b"one".as_ref(),
3362 b"two".as_ref(),
3363 b"three".as_ref(),
3364 ])) as ArrayRef,
3365 ),
3366 (
3367 "large_binary_to_large_utf8",
3368 Arc::new(LargeBinaryArray::from(vec![
3369 b"one".as_ref(),
3370 b"two".as_ref(),
3371 b"three".as_ref(),
3372 ])) as ArrayRef,
3373 ),
3374 (
3375 "binary_view_to_utf8_view",
3376 Arc::new(BinaryViewArray::from(vec![
3377 b"one".as_ref(),
3378 b"two".as_ref(),
3379 b"three".as_ref(),
3380 ])) as ArrayRef,
3381 ),
3382 ]);
3383 let supplied_fields = Fields::from(vec![
3384 Field::new("binary_to_utf8", ArrowDataType::Utf8, false),
3385 Field::new(
3386 "large_binary_to_large_utf8",
3387 ArrowDataType::LargeUtf8,
3388 false,
3389 ),
3390 Field::new("binary_view_to_utf8_view", ArrowDataType::Utf8View, false),
3391 ]);
3392
3393 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3394 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3395 file.try_clone().unwrap(),
3396 options,
3397 )
3398 .expect("reader builder with schema")
3399 .build()
3400 .expect("reader with schema");
3401
3402 let batch = arrow_reader.next().unwrap().unwrap();
3403 assert_eq!(batch.num_columns(), 3);
3404 assert_eq!(batch.num_rows(), 3);
3405 assert_eq!(
3406 batch
3407 .column(0)
3408 .as_string::<i32>()
3409 .iter()
3410 .collect::<Vec<_>>(),
3411 vec![Some("one"), Some("two"), Some("three")]
3412 );
3413
3414 assert_eq!(
3415 batch
3416 .column(1)
3417 .as_string::<i64>()
3418 .iter()
3419 .collect::<Vec<_>>(),
3420 vec![Some("one"), Some("two"), Some("three")]
3421 );
3422
3423 assert_eq!(
3424 batch.column(2).as_string_view().iter().collect::<Vec<_>>(),
3425 vec![Some("one"), Some("two"), Some("three")]
3426 );
3427 }
3428
3429 #[test]
3430 #[should_panic(expected = "Invalid UTF8 sequence at")]
3431 fn test_read_non_utf8_binary_as_utf8() {
3432 let file = write_parquet_from_iter(vec![(
3433 "non_utf8_binary",
3434 Arc::new(BinaryArray::from(vec![
3435 b"\xDE\x00\xFF".as_ref(),
3436 b"\xDE\x01\xAA".as_ref(),
3437 b"\xDE\x02\xFF".as_ref(),
3438 ])) as ArrayRef,
3439 )]);
3440 let supplied_fields = Fields::from(vec![Field::new(
3441 "non_utf8_binary",
3442 ArrowDataType::Utf8,
3443 false,
3444 )]);
3445
3446 let options = ArrowReaderOptions::new().with_schema(Arc::new(Schema::new(supplied_fields)));
3447 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3448 file.try_clone().unwrap(),
3449 options,
3450 )
3451 .expect("reader builder with schema")
3452 .build()
3453 .expect("reader with schema");
3454 arrow_reader.next().unwrap().unwrap_err();
3455 }
3456
3457 #[test]
3458 fn test_with_schema() {
3459 let nested_fields = Fields::from(vec![
3460 Field::new("utf8_to_dict", ArrowDataType::Utf8, false),
3461 Field::new("int64_to_ts_nano", ArrowDataType::Int64, false),
3462 ]);
3463
3464 let nested_arrays: Vec<ArrayRef> = vec![
3465 Arc::new(StringArray::from(vec!["a", "a", "a", "b"])) as ArrayRef,
3466 Arc::new(Int64Array::from(vec![1, 2, 3, 4])) as ArrayRef,
3467 ];
3468
3469 let nested = StructArray::try_new(nested_fields, nested_arrays, None).unwrap();
3470
3471 let file = write_parquet_from_iter(vec![
3472 (
3473 "int32_to_ts_second",
3474 Arc::new(Int32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3475 ),
3476 (
3477 "date32_to_date64",
3478 Arc::new(Date32Array::from(vec![0, 1, 2, 3])) as ArrayRef,
3479 ),
3480 ("nested", Arc::new(nested) as ArrayRef),
3481 ]);
3482
3483 let supplied_nested_fields = Fields::from(vec![
3484 Field::new(
3485 "utf8_to_dict",
3486 ArrowDataType::Dictionary(
3487 Box::new(ArrowDataType::Int32),
3488 Box::new(ArrowDataType::Utf8),
3489 ),
3490 false,
3491 ),
3492 Field::new(
3493 "int64_to_ts_nano",
3494 ArrowDataType::Timestamp(
3495 arrow::datatypes::TimeUnit::Nanosecond,
3496 Some("+10:00".into()),
3497 ),
3498 false,
3499 ),
3500 ]);
3501
3502 let supplied_schema = Arc::new(Schema::new(vec![
3503 Field::new(
3504 "int32_to_ts_second",
3505 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Second, Some("+01:00".into())),
3506 false,
3507 ),
3508 Field::new("date32_to_date64", ArrowDataType::Date64, false),
3509 Field::new(
3510 "nested",
3511 ArrowDataType::Struct(supplied_nested_fields),
3512 false,
3513 ),
3514 ]));
3515
3516 let options = ArrowReaderOptions::new().with_schema(supplied_schema.clone());
3517 let mut arrow_reader = ParquetRecordBatchReaderBuilder::try_new_with_options(
3518 file.try_clone().unwrap(),
3519 options,
3520 )
3521 .expect("reader builder with schema")
3522 .build()
3523 .expect("reader with schema");
3524
3525 assert_eq!(arrow_reader.schema(), supplied_schema);
3526 let batch = arrow_reader.next().unwrap().unwrap();
3527 assert_eq!(batch.num_columns(), 3);
3528 assert_eq!(batch.num_rows(), 4);
3529 assert_eq!(
3530 batch
3531 .column(0)
3532 .as_any()
3533 .downcast_ref::<TimestampSecondArray>()
3534 .expect("downcast to timestamp second")
3535 .value_as_datetime_with_tz(0, "+01:00".parse().unwrap())
3536 .map(|v| v.to_string())
3537 .expect("value as datetime"),
3538 "1970-01-01 01:00:00 +01:00"
3539 );
3540 assert_eq!(
3541 batch
3542 .column(1)
3543 .as_any()
3544 .downcast_ref::<Date64Array>()
3545 .expect("downcast to date64")
3546 .value_as_date(0)
3547 .map(|v| v.to_string())
3548 .expect("value as date"),
3549 "1970-01-01"
3550 );
3551
3552 let nested = batch
3553 .column(2)
3554 .as_any()
3555 .downcast_ref::<StructArray>()
3556 .expect("downcast to struct");
3557
3558 let nested_dict = nested
3559 .column(0)
3560 .as_any()
3561 .downcast_ref::<Int32DictionaryArray>()
3562 .expect("downcast to dictionary");
3563
3564 assert_eq!(
3565 nested_dict
3566 .values()
3567 .as_any()
3568 .downcast_ref::<StringArray>()
3569 .expect("downcast to string")
3570 .iter()
3571 .collect::<Vec<_>>(),
3572 vec![Some("a"), Some("b")]
3573 );
3574
3575 assert_eq!(
3576 nested_dict.keys().iter().collect::<Vec<_>>(),
3577 vec![Some(0), Some(0), Some(0), Some(1)]
3578 );
3579
3580 assert_eq!(
3581 nested
3582 .column(1)
3583 .as_any()
3584 .downcast_ref::<TimestampNanosecondArray>()
3585 .expect("downcast to timestamp nanosecond")
3586 .value_as_datetime_with_tz(0, "+10:00".parse().unwrap())
3587 .map(|v| v.to_string())
3588 .expect("value as datetime"),
3589 "1970-01-01 10:00:00.000000001 +10:00"
3590 );
3591 }
3592
3593 #[test]
3594 fn test_empty_projection() {
3595 let testdata = arrow::util::test_util::parquet_test_data();
3596 let path = format!("{testdata}/alltypes_plain.parquet");
3597 let file = File::open(path).unwrap();
3598
3599 let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
3600 let file_metadata = builder.metadata().file_metadata();
3601 let expected_rows = file_metadata.num_rows() as usize;
3602
3603 let mask = ProjectionMask::leaves(builder.parquet_schema(), []);
3604 let batch_reader = builder
3605 .with_projection(mask)
3606 .with_batch_size(2)
3607 .build()
3608 .unwrap();
3609
3610 let mut total_rows = 0;
3611 for maybe_batch in batch_reader {
3612 let batch = maybe_batch.unwrap();
3613 total_rows += batch.num_rows();
3614 assert_eq!(batch.num_columns(), 0);
3615 assert!(batch.num_rows() <= 2);
3616 }
3617
3618 assert_eq!(total_rows, expected_rows);
3619 }
3620
3621 fn test_row_group_batch(row_group_size: usize, batch_size: usize) {
3622 let schema = Arc::new(Schema::new(vec![Field::new(
3623 "list",
3624 ArrowDataType::List(Arc::new(Field::new_list_field(ArrowDataType::Int32, true))),
3625 true,
3626 )]));
3627
3628 let mut buf = Vec::with_capacity(1024);
3629
3630 let mut writer = ArrowWriter::try_new(
3631 &mut buf,
3632 schema.clone(),
3633 Some(
3634 WriterProperties::builder()
3635 .set_max_row_group_size(row_group_size)
3636 .build(),
3637 ),
3638 )
3639 .unwrap();
3640 for _ in 0..2 {
3641 let mut list_builder = ListBuilder::new(Int32Builder::with_capacity(batch_size));
3642 for _ in 0..(batch_size) {
3643 list_builder.append(true);
3644 }
3645 let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(list_builder.finish())])
3646 .unwrap();
3647 writer.write(&batch).unwrap();
3648 }
3649 writer.close().unwrap();
3650
3651 let mut record_reader =
3652 ParquetRecordBatchReader::try_new(Bytes::from(buf), batch_size).unwrap();
3653 assert_eq!(
3654 batch_size,
3655 record_reader.next().unwrap().unwrap().num_rows()
3656 );
3657 assert_eq!(
3658 batch_size,
3659 record_reader.next().unwrap().unwrap().num_rows()
3660 );
3661 }
3662
3663 #[test]
3664 fn test_row_group_exact_multiple() {
3665 const BATCH_SIZE: usize = REPETITION_LEVELS_BATCH_SIZE;
3666 test_row_group_batch(8, 8);
3667 test_row_group_batch(10, 8);
3668 test_row_group_batch(8, 10);
3669 test_row_group_batch(BATCH_SIZE, BATCH_SIZE);
3670 test_row_group_batch(BATCH_SIZE + 1, BATCH_SIZE);
3671 test_row_group_batch(BATCH_SIZE, BATCH_SIZE + 1);
3672 test_row_group_batch(BATCH_SIZE, BATCH_SIZE - 1);
3673 test_row_group_batch(BATCH_SIZE - 1, BATCH_SIZE);
3674 }
3675
3676 fn get_expected_batches(
3679 column: &RecordBatch,
3680 selection: &RowSelection,
3681 batch_size: usize,
3682 ) -> Vec<RecordBatch> {
3683 let mut expected_batches = vec![];
3684
3685 let mut selection: VecDeque<_> = selection.clone().into();
3686 let mut row_offset = 0;
3687 let mut last_start = None;
3688 while row_offset < column.num_rows() && !selection.is_empty() {
3689 let mut batch_remaining = batch_size.min(column.num_rows() - row_offset);
3690 while batch_remaining > 0 && !selection.is_empty() {
3691 let (to_read, skip) = match selection.front_mut() {
3692 Some(selection) if selection.row_count > batch_remaining => {
3693 selection.row_count -= batch_remaining;
3694 (batch_remaining, selection.skip)
3695 }
3696 Some(_) => {
3697 let select = selection.pop_front().unwrap();
3698 (select.row_count, select.skip)
3699 }
3700 None => break,
3701 };
3702
3703 batch_remaining -= to_read;
3704
3705 match skip {
3706 true => {
3707 if let Some(last_start) = last_start.take() {
3708 expected_batches.push(column.slice(last_start, row_offset - last_start))
3709 }
3710 row_offset += to_read
3711 }
3712 false => {
3713 last_start.get_or_insert(row_offset);
3714 row_offset += to_read
3715 }
3716 }
3717 }
3718 }
3719
3720 if let Some(last_start) = last_start.take() {
3721 expected_batches.push(column.slice(last_start, row_offset - last_start))
3722 }
3723
3724 for batch in &expected_batches[..expected_batches.len() - 1] {
3726 assert_eq!(batch.num_rows(), batch_size);
3727 }
3728
3729 expected_batches
3730 }
3731
3732 fn create_test_selection(
3733 step_len: usize,
3734 total_len: usize,
3735 skip_first: bool,
3736 ) -> (RowSelection, usize) {
3737 let mut remaining = total_len;
3738 let mut skip = skip_first;
3739 let mut vec = vec![];
3740 let mut selected_count = 0;
3741 while remaining != 0 {
3742 let step = if remaining > step_len {
3743 step_len
3744 } else {
3745 remaining
3746 };
3747 vec.push(RowSelector {
3748 row_count: step,
3749 skip,
3750 });
3751 remaining -= step;
3752 if !skip {
3753 selected_count += step;
3754 }
3755 skip = !skip;
3756 }
3757 (vec.into(), selected_count)
3758 }
3759
3760 #[test]
3761 fn test_scan_row_with_selection() {
3762 let testdata = arrow::util::test_util::parquet_test_data();
3763 let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
3764 let test_file = File::open(&path).unwrap();
3765
3766 let mut serial_reader =
3767 ParquetRecordBatchReader::try_new(File::open(&path).unwrap(), 7300).unwrap();
3768 let data = serial_reader.next().unwrap().unwrap();
3769
3770 let do_test = |batch_size: usize, selection_len: usize| {
3771 for skip_first in [false, true] {
3772 let selections = create_test_selection(batch_size, data.num_rows(), skip_first).0;
3773
3774 let expected = get_expected_batches(&data, &selections, batch_size);
3775 let skip_reader = create_skip_reader(&test_file, batch_size, selections);
3776 assert_eq!(
3777 skip_reader.collect::<Result<Vec<_>, _>>().unwrap(),
3778 expected,
3779 "batch_size: {batch_size}, selection_len: {selection_len}, skip_first: {skip_first}"
3780 );
3781 }
3782 };
3783
3784 do_test(1000, 1000);
3787
3788 do_test(20, 20);
3790
3791 do_test(20, 5);
3793
3794 do_test(20, 5);
3797
3798 fn create_skip_reader(
3799 test_file: &File,
3800 batch_size: usize,
3801 selections: RowSelection,
3802 ) -> ParquetRecordBatchReader {
3803 let options = ArrowReaderOptions::new().with_page_index(true);
3804 let file = test_file.try_clone().unwrap();
3805 ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)
3806 .unwrap()
3807 .with_batch_size(batch_size)
3808 .with_row_selection(selections)
3809 .build()
3810 .unwrap()
3811 }
3812 }
3813
3814 #[test]
3815 fn test_batch_size_overallocate() {
3816 let testdata = arrow::util::test_util::parquet_test_data();
3817 let path = format!("{testdata}/alltypes_plain.parquet");
3819 let test_file = File::open(path).unwrap();
3820
3821 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
3822 let num_rows = builder.metadata.file_metadata().num_rows();
3823 let reader = builder
3824 .with_batch_size(1024)
3825 .with_projection(ProjectionMask::all())
3826 .build()
3827 .unwrap();
3828 assert_ne!(1024, num_rows);
3829 assert_eq!(reader.batch_size, num_rows as usize);
3830 }
3831
3832 #[test]
3833 fn test_read_with_page_index_enabled() {
3834 let testdata = arrow::util::test_util::parquet_test_data();
3835
3836 {
3837 let path = format!("{testdata}/alltypes_tiny_pages.parquet");
3839 let test_file = File::open(path).unwrap();
3840 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3841 test_file,
3842 ArrowReaderOptions::new().with_page_index(true),
3843 )
3844 .unwrap();
3845 assert!(!builder.metadata().offset_index().unwrap()[0].is_empty());
3846 let reader = builder.build().unwrap();
3847 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3848 assert_eq!(batches.len(), 8);
3849 }
3850
3851 {
3852 let path = format!("{testdata}/alltypes_plain.parquet");
3854 let test_file = File::open(path).unwrap();
3855 let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
3856 test_file,
3857 ArrowReaderOptions::new().with_page_index(true),
3858 )
3859 .unwrap();
3860 assert!(builder.metadata().offset_index().is_none());
3863 let reader = builder.build().unwrap();
3864 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
3865 assert_eq!(batches.len(), 1);
3866 }
3867 }
3868
3869 #[test]
3870 fn test_raw_repetition() {
3871 const MESSAGE_TYPE: &str = "
3872 message Log {
3873 OPTIONAL INT32 eventType;
3874 REPEATED INT32 category;
3875 REPEATED group filter {
3876 OPTIONAL INT32 error;
3877 }
3878 }
3879 ";
3880 let schema = Arc::new(parse_message_type(MESSAGE_TYPE).unwrap());
3881 let props = Default::default();
3882
3883 let mut buf = Vec::with_capacity(1024);
3884 let mut writer = SerializedFileWriter::new(&mut buf, schema, props).unwrap();
3885 let mut row_group_writer = writer.next_row_group().unwrap();
3886
3887 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3889 col_writer
3890 .typed::<Int32Type>()
3891 .write_batch(&[1], Some(&[1]), None)
3892 .unwrap();
3893 col_writer.close().unwrap();
3894 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3896 col_writer
3897 .typed::<Int32Type>()
3898 .write_batch(&[1, 1], Some(&[1, 1]), Some(&[0, 1]))
3899 .unwrap();
3900 col_writer.close().unwrap();
3901 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3903 col_writer
3904 .typed::<Int32Type>()
3905 .write_batch(&[1], Some(&[1]), Some(&[0]))
3906 .unwrap();
3907 col_writer.close().unwrap();
3908
3909 let rg_md = row_group_writer.close().unwrap();
3910 assert_eq!(rg_md.num_rows(), 1);
3911 writer.close().unwrap();
3912
3913 let bytes = Bytes::from(buf);
3914
3915 let mut no_mask = ParquetRecordBatchReader::try_new(bytes.clone(), 1024).unwrap();
3916 let full = no_mask.next().unwrap().unwrap();
3917
3918 assert_eq!(full.num_columns(), 3);
3919
3920 for idx in 0..3 {
3921 let b = ParquetRecordBatchReaderBuilder::try_new(bytes.clone()).unwrap();
3922 let mask = ProjectionMask::leaves(b.parquet_schema(), [idx]);
3923 let mut reader = b.with_projection(mask).build().unwrap();
3924 let projected = reader.next().unwrap().unwrap();
3925
3926 assert_eq!(projected.num_columns(), 1);
3927 assert_eq!(full.column(idx), projected.column(0));
3928 }
3929 }
3930
3931 #[test]
3932 fn test_read_lz4_raw() {
3933 let testdata = arrow::util::test_util::parquet_test_data();
3934 let path = format!("{testdata}/lz4_raw_compressed.parquet");
3935 let file = File::open(path).unwrap();
3936
3937 let batches = ParquetRecordBatchReader::try_new(file, 1024)
3938 .unwrap()
3939 .collect::<Result<Vec<_>, _>>()
3940 .unwrap();
3941 assert_eq!(batches.len(), 1);
3942 let batch = &batches[0];
3943
3944 assert_eq!(batch.num_columns(), 3);
3945 assert_eq!(batch.num_rows(), 4);
3946
3947 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
3949 assert_eq!(
3950 a.values(),
3951 &[1593604800, 1593604800, 1593604801, 1593604801]
3952 );
3953
3954 let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
3955 let a: Vec<_> = a.iter().flatten().collect();
3956 assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
3957
3958 let a: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
3959 assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
3960 }
3961
3962 #[test]
3972 fn test_read_lz4_hadoop_fallback() {
3973 for file in [
3974 "hadoop_lz4_compressed.parquet",
3975 "non_hadoop_lz4_compressed.parquet",
3976 ] {
3977 let testdata = arrow::util::test_util::parquet_test_data();
3978 let path = format!("{testdata}/{file}");
3979 let file = File::open(path).unwrap();
3980 let expected_rows = 4;
3981
3982 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
3983 .unwrap()
3984 .collect::<Result<Vec<_>, _>>()
3985 .unwrap();
3986 assert_eq!(batches.len(), 1);
3987 let batch = &batches[0];
3988
3989 assert_eq!(batch.num_columns(), 3);
3990 assert_eq!(batch.num_rows(), expected_rows);
3991
3992 let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
3993 assert_eq!(
3994 a.values(),
3995 &[1593604800, 1593604800, 1593604801, 1593604801]
3996 );
3997
3998 let b: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
3999 let b: Vec<_> = b.iter().flatten().collect();
4000 assert_eq!(b, &[b"abc", b"def", b"abc", b"def"]);
4001
4002 let c: &Float64Array = batch.column(2).as_any().downcast_ref().unwrap();
4003 assert_eq!(c.values(), &[42.0, 7.7, 42.125, 7.7]);
4004 }
4005 }
4006
4007 #[test]
4008 fn test_read_lz4_hadoop_large() {
4009 let testdata = arrow::util::test_util::parquet_test_data();
4010 let path = format!("{testdata}/hadoop_lz4_compressed_larger.parquet");
4011 let file = File::open(path).unwrap();
4012 let expected_rows = 10000;
4013
4014 let batches = ParquetRecordBatchReader::try_new(file, expected_rows)
4015 .unwrap()
4016 .collect::<Result<Vec<_>, _>>()
4017 .unwrap();
4018 assert_eq!(batches.len(), 1);
4019 let batch = &batches[0];
4020
4021 assert_eq!(batch.num_columns(), 1);
4022 assert_eq!(batch.num_rows(), expected_rows);
4023
4024 let a: &StringArray = batch.column(0).as_any().downcast_ref().unwrap();
4025 let a: Vec<_> = a.iter().flatten().collect();
4026 assert_eq!(a[0], "c7ce6bef-d5b0-4863-b199-8ea8c7fb117b");
4027 assert_eq!(a[1], "e8fb9197-cb9f-4118-b67f-fbfa65f61843");
4028 assert_eq!(a[expected_rows - 2], "ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c");
4029 assert_eq!(a[expected_rows - 1], "85440778-460a-41ac-aa2e-ac3ee41696bf");
4030 }
4031
4032 #[test]
4033 #[cfg(feature = "snap")]
4034 fn test_read_nested_lists() {
4035 let testdata = arrow::util::test_util::parquet_test_data();
4036 let path = format!("{testdata}/nested_lists.snappy.parquet");
4037 let file = File::open(path).unwrap();
4038
4039 let f = file.try_clone().unwrap();
4040 let mut reader = ParquetRecordBatchReader::try_new(f, 60).unwrap();
4041 let expected = reader.next().unwrap().unwrap();
4042 assert_eq!(expected.num_rows(), 3);
4043
4044 let selection = RowSelection::from(vec![
4045 RowSelector::skip(1),
4046 RowSelector::select(1),
4047 RowSelector::skip(1),
4048 ]);
4049 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4050 .unwrap()
4051 .with_row_selection(selection)
4052 .build()
4053 .unwrap();
4054
4055 let actual = reader.next().unwrap().unwrap();
4056 assert_eq!(actual.num_rows(), 1);
4057 assert_eq!(actual.column(0), &expected.column(0).slice(1, 1));
4058 }
4059
4060 #[test]
4061 fn test_arbitrary_decimal() {
4062 let values = [1, 2, 3, 4, 5, 6, 7, 8];
4063 let decimals_19_0 = Decimal128Array::from_iter_values(values)
4064 .with_precision_and_scale(19, 0)
4065 .unwrap();
4066 let decimals_12_0 = Decimal128Array::from_iter_values(values)
4067 .with_precision_and_scale(12, 0)
4068 .unwrap();
4069 let decimals_17_10 = Decimal128Array::from_iter_values(values)
4070 .with_precision_and_scale(17, 10)
4071 .unwrap();
4072
4073 let written = RecordBatch::try_from_iter([
4074 ("decimal_values_19_0", Arc::new(decimals_19_0) as ArrayRef),
4075 ("decimal_values_12_0", Arc::new(decimals_12_0) as ArrayRef),
4076 ("decimal_values_17_10", Arc::new(decimals_17_10) as ArrayRef),
4077 ])
4078 .unwrap();
4079
4080 let mut buffer = Vec::with_capacity(1024);
4081 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
4082 writer.write(&written).unwrap();
4083 writer.close().unwrap();
4084
4085 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 8)
4086 .unwrap()
4087 .collect::<Result<Vec<_>, _>>()
4088 .unwrap();
4089
4090 assert_eq!(&written.slice(0, 8), &read[0]);
4091 }
4092
4093 #[test]
4094 fn test_list_skip() {
4095 let mut list = ListBuilder::new(Int32Builder::new());
4096 list.append_value([Some(1), Some(2)]);
4097 list.append_value([Some(3)]);
4098 list.append_value([Some(4)]);
4099 let list = list.finish();
4100 let batch = RecordBatch::try_from_iter([("l", Arc::new(list) as _)]).unwrap();
4101
4102 let props = WriterProperties::builder()
4104 .set_data_page_row_count_limit(1)
4105 .set_write_batch_size(2)
4106 .build();
4107
4108 let mut buffer = Vec::with_capacity(1024);
4109 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), Some(props)).unwrap();
4110 writer.write(&batch).unwrap();
4111 writer.close().unwrap();
4112
4113 let selection = vec![RowSelector::skip(2), RowSelector::select(1)];
4114 let mut reader = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer))
4115 .unwrap()
4116 .with_row_selection(selection.into())
4117 .build()
4118 .unwrap();
4119 let out = reader.next().unwrap().unwrap();
4120 assert_eq!(out.num_rows(), 1);
4121 assert_eq!(out, batch.slice(2, 1));
4122 }
4123
4124 fn test_decimal_roundtrip<T: DecimalType>() {
4125 let d = |values: Vec<usize>, p: u8| {
4130 let iter = values.into_iter().map(T::Native::usize_as);
4131 PrimitiveArray::<T>::from_iter_values(iter)
4132 .with_precision_and_scale(p, 2)
4133 .unwrap()
4134 };
4135
4136 let d1 = d(vec![1, 2, 3, 4, 5], 9);
4137 let d2 = d(vec![1, 2, 3, 4, 10.pow(10) - 1], 10);
4138 let d3 = d(vec![1, 2, 3, 4, 10.pow(18) - 1], 18);
4139 let d4 = d(vec![1, 2, 3, 4, 10.pow(19) - 1], 19);
4140
4141 let batch = RecordBatch::try_from_iter([
4142 ("d1", Arc::new(d1) as ArrayRef),
4143 ("d2", Arc::new(d2) as ArrayRef),
4144 ("d3", Arc::new(d3) as ArrayRef),
4145 ("d4", Arc::new(d4) as ArrayRef),
4146 ])
4147 .unwrap();
4148
4149 let mut buffer = Vec::with_capacity(1024);
4150 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
4151 writer.write(&batch).unwrap();
4152 writer.close().unwrap();
4153
4154 let builder = ParquetRecordBatchReaderBuilder::try_new(Bytes::from(buffer)).unwrap();
4155 let t1 = builder.parquet_schema().columns()[0].physical_type();
4156 assert_eq!(t1, PhysicalType::INT32);
4157 let t2 = builder.parquet_schema().columns()[1].physical_type();
4158 assert_eq!(t2, PhysicalType::INT64);
4159 let t3 = builder.parquet_schema().columns()[2].physical_type();
4160 assert_eq!(t3, PhysicalType::INT64);
4161 let t4 = builder.parquet_schema().columns()[3].physical_type();
4162 assert_eq!(t4, PhysicalType::FIXED_LEN_BYTE_ARRAY);
4163
4164 let mut reader = builder.build().unwrap();
4165 assert_eq!(batch.schema(), reader.schema());
4166
4167 let out = reader.next().unwrap().unwrap();
4168 assert_eq!(batch, out);
4169 }
4170
4171 #[test]
4172 fn test_decimal() {
4173 test_decimal_roundtrip::<Decimal128Type>();
4174 test_decimal_roundtrip::<Decimal256Type>();
4175 }
4176
4177 #[test]
4178 fn test_list_selection() {
4179 let schema = Arc::new(Schema::new(vec![Field::new_list(
4180 "list",
4181 Field::new_list_field(ArrowDataType::Utf8, true),
4182 false,
4183 )]));
4184 let mut buf = Vec::with_capacity(1024);
4185
4186 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4187
4188 for i in 0..2 {
4189 let mut list_a_builder = ListBuilder::new(StringBuilder::new());
4190 for j in 0..1024 {
4191 list_a_builder.values().append_value(format!("{i} {j}"));
4192 list_a_builder.append(true);
4193 }
4194 let batch =
4195 RecordBatch::try_new(schema.clone(), vec![Arc::new(list_a_builder.finish())])
4196 .unwrap();
4197 writer.write(&batch).unwrap();
4198 }
4199 let _metadata = writer.close().unwrap();
4200
4201 let buf = Bytes::from(buf);
4202 let reader = ParquetRecordBatchReaderBuilder::try_new(buf)
4203 .unwrap()
4204 .with_row_selection(RowSelection::from(vec![
4205 RowSelector::skip(100),
4206 RowSelector::select(924),
4207 RowSelector::skip(100),
4208 RowSelector::select(924),
4209 ]))
4210 .build()
4211 .unwrap();
4212
4213 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4214 let batch = concat_batches(&schema, &batches).unwrap();
4215
4216 assert_eq!(batch.num_rows(), 924 * 2);
4217 let list = batch.column(0).as_list::<i32>();
4218
4219 for w in list.value_offsets().windows(2) {
4220 assert_eq!(w[0] + 1, w[1])
4221 }
4222 let mut values = list.values().as_string::<i32>().iter();
4223
4224 for i in 0..2 {
4225 for j in 100..1024 {
4226 let expected = format!("{i} {j}");
4227 assert_eq!(values.next().unwrap().unwrap(), &expected);
4228 }
4229 }
4230 }
4231
4232 #[test]
4233 fn test_list_selection_fuzz() {
4234 let mut rng = rng();
4235 let schema = Arc::new(Schema::new(vec![Field::new_list(
4236 "list",
4237 Field::new_list(
4238 Field::LIST_FIELD_DEFAULT_NAME,
4239 Field::new_list_field(ArrowDataType::Int32, true),
4240 true,
4241 ),
4242 true,
4243 )]));
4244 let mut buf = Vec::with_capacity(1024);
4245 let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap();
4246
4247 let mut list_a_builder = ListBuilder::new(ListBuilder::new(Int32Builder::new()));
4248
4249 for _ in 0..2048 {
4250 if rng.random_bool(0.2) {
4251 list_a_builder.append(false);
4252 continue;
4253 }
4254
4255 let list_a_len = rng.random_range(0..10);
4256 let list_b_builder = list_a_builder.values();
4257
4258 for _ in 0..list_a_len {
4259 if rng.random_bool(0.2) {
4260 list_b_builder.append(false);
4261 continue;
4262 }
4263
4264 let list_b_len = rng.random_range(0..10);
4265 let int_builder = list_b_builder.values();
4266 for _ in 0..list_b_len {
4267 match rng.random_bool(0.2) {
4268 true => int_builder.append_null(),
4269 false => int_builder.append_value(rng.random()),
4270 }
4271 }
4272 list_b_builder.append(true)
4273 }
4274 list_a_builder.append(true);
4275 }
4276
4277 let array = Arc::new(list_a_builder.finish());
4278 let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
4279
4280 writer.write(&batch).unwrap();
4281 let _metadata = writer.close().unwrap();
4282
4283 let buf = Bytes::from(buf);
4284
4285 let cases = [
4286 vec![
4287 RowSelector::skip(100),
4288 RowSelector::select(924),
4289 RowSelector::skip(100),
4290 RowSelector::select(924),
4291 ],
4292 vec![
4293 RowSelector::select(924),
4294 RowSelector::skip(100),
4295 RowSelector::select(924),
4296 RowSelector::skip(100),
4297 ],
4298 vec![
4299 RowSelector::skip(1023),
4300 RowSelector::select(1),
4301 RowSelector::skip(1023),
4302 RowSelector::select(1),
4303 ],
4304 vec![
4305 RowSelector::select(1),
4306 RowSelector::skip(1023),
4307 RowSelector::select(1),
4308 RowSelector::skip(1023),
4309 ],
4310 ];
4311
4312 for batch_size in [100, 1024, 2048] {
4313 for selection in &cases {
4314 let selection = RowSelection::from(selection.clone());
4315 let reader = ParquetRecordBatchReaderBuilder::try_new(buf.clone())
4316 .unwrap()
4317 .with_row_selection(selection.clone())
4318 .with_batch_size(batch_size)
4319 .build()
4320 .unwrap();
4321
4322 let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
4323 let actual = concat_batches(batch.schema_ref(), &batches).unwrap();
4324 assert_eq!(actual.num_rows(), selection.row_count());
4325
4326 let mut batch_offset = 0;
4327 let mut actual_offset = 0;
4328 for selector in selection.iter() {
4329 if selector.skip {
4330 batch_offset += selector.row_count;
4331 continue;
4332 }
4333
4334 assert_eq!(
4335 batch.slice(batch_offset, selector.row_count),
4336 actual.slice(actual_offset, selector.row_count)
4337 );
4338
4339 batch_offset += selector.row_count;
4340 actual_offset += selector.row_count;
4341 }
4342 }
4343 }
4344 }
4345
4346 #[test]
4347 fn test_read_old_nested_list() {
4348 use arrow::datatypes::DataType;
4349 use arrow::datatypes::ToByteSlice;
4350
4351 let testdata = arrow::util::test_util::parquet_test_data();
4352 let path = format!("{testdata}/old_list_structure.parquet");
4361 let test_file = File::open(path).unwrap();
4362
4363 let a_values = Int32Array::from(vec![1, 2, 3, 4]);
4365
4366 let a_value_offsets = arrow::buffer::Buffer::from([0, 2, 4].to_byte_slice());
4368
4369 let a_list_data = ArrayData::builder(DataType::List(Arc::new(Field::new(
4371 "array",
4372 DataType::Int32,
4373 false,
4374 ))))
4375 .len(2)
4376 .add_buffer(a_value_offsets)
4377 .add_child_data(a_values.into_data())
4378 .build()
4379 .unwrap();
4380 let a = ListArray::from(a_list_data);
4381
4382 let builder = ParquetRecordBatchReaderBuilder::try_new(test_file).unwrap();
4383 let mut reader = builder.build().unwrap();
4384 let out = reader.next().unwrap().unwrap();
4385 assert_eq!(out.num_rows(), 1);
4386 assert_eq!(out.num_columns(), 1);
4387 let c0 = out.column(0);
4389 let c0arr = c0.as_any().downcast_ref::<ListArray>().unwrap();
4390 let r0 = c0arr.value(0);
4392 let r0arr = r0.as_any().downcast_ref::<ListArray>().unwrap();
4393 assert_eq!(r0arr, &a);
4394 }
4395
4396 #[test]
4397 fn test_map_no_value() {
4398 let testdata = arrow::util::test_util::parquet_test_data();
4418 let path = format!("{testdata}/map_no_value.parquet");
4419 let file = File::open(path).unwrap();
4420
4421 let mut reader = ParquetRecordBatchReaderBuilder::try_new(file)
4422 .unwrap()
4423 .build()
4424 .unwrap();
4425 let out = reader.next().unwrap().unwrap();
4426 assert_eq!(out.num_rows(), 3);
4427 assert_eq!(out.num_columns(), 3);
4428 let c0 = out.column(1).as_list::<i32>();
4430 let c1 = out.column(2).as_list::<i32>();
4431 assert_eq!(c0.len(), c1.len());
4432 c0.iter().zip(c1.iter()).for_each(|(l, r)| assert_eq!(l, r));
4433 }
4434}