parquet/arrow/array_reader/
mod.rs1use crate::errors::Result;
21use arrow_array::ArrayRef;
22use arrow_schema::DataType as ArrowType;
23use std::any::Any;
24use std::sync::Arc;
25
26use crate::arrow::record_reader::buffer::ValuesBuffer;
27use crate::arrow::record_reader::GenericRecordReader;
28use crate::column::page::PageIterator;
29use crate::column::reader::decoder::ColumnValueDecoder;
30use crate::file::reader::{FilePageIterator, FileReader};
31
32mod builder;
33mod byte_array;
34mod byte_array_dictionary;
35mod byte_view_array;
36mod empty_array;
37mod fixed_len_byte_array;
38mod fixed_size_list_array;
39mod list_array;
40mod map_array;
41mod null_array;
42mod primitive_array;
43mod struct_array;
44
45#[cfg(test)]
46mod test_util;
47
48pub use builder::ArrayReaderBuilder;
50pub use byte_array::make_byte_array_reader;
51pub use byte_array_dictionary::make_byte_array_dictionary_reader;
52#[allow(unused_imports)] pub use byte_view_array::make_byte_view_array_reader;
54#[allow(unused_imports)] pub use fixed_len_byte_array::make_fixed_len_byte_array_reader;
56pub use fixed_size_list_array::FixedSizeListArrayReader;
57pub use list_array::ListArrayReader;
58pub use map_array::MapArrayReader;
59pub use null_array::NullArrayReader;
60pub use primitive_array::PrimitiveArrayReader;
61pub use struct_array::StructArrayReader;
62
63pub trait ArrayReader: Send {
65 #[allow(dead_code)]
68 fn as_any(&self) -> &dyn Any;
69
70 fn get_data_type(&self) -> &ArrowType;
72
73 #[cfg(any(feature = "experimental", test))]
75 fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
76 self.read_records(batch_size)?;
77 self.consume_batch()
78 }
79
80 fn read_records(&mut self, batch_size: usize) -> Result<usize>;
85
86 fn consume_batch(&mut self) -> Result<ArrayRef>;
89
90 fn skip_records(&mut self, num_records: usize) -> Result<usize>;
92
93 fn get_def_levels(&self) -> Option<&[i16]>;
100
101 fn get_rep_levels(&self) -> Option<&[i16]>;
108}
109
110pub trait RowGroups {
112 fn num_rows(&self) -> usize;
114
115 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
118}
119
120impl RowGroups for Arc<dyn FileReader> {
121 fn num_rows(&self) -> usize {
122 self.metadata().file_metadata().num_rows() as usize
123 }
124
125 fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
126 let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
127 Ok(Box::new(iterator))
128 }
129}
130
131fn read_records<V, CV>(
136 record_reader: &mut GenericRecordReader<V, CV>,
137 pages: &mut dyn PageIterator,
138 batch_size: usize,
139) -> Result<usize>
140where
141 V: ValuesBuffer,
142 CV: ColumnValueDecoder<Buffer = V>,
143{
144 let mut records_read = 0usize;
145 while records_read < batch_size {
146 let records_to_read = batch_size - records_read;
147
148 let records_read_once = record_reader.read_records(records_to_read)?;
149 records_read += records_read_once;
150
151 if records_read_once < records_to_read {
153 if let Some(page_reader) = pages.next() {
154 record_reader.set_page_reader(page_reader?)?;
156 } else {
157 break;
159 }
160 }
161 }
162 Ok(records_read)
163}
164
165fn skip_records<V, CV>(
170 record_reader: &mut GenericRecordReader<V, CV>,
171 pages: &mut dyn PageIterator,
172 batch_size: usize,
173) -> Result<usize>
174where
175 V: ValuesBuffer,
176 CV: ColumnValueDecoder<Buffer = V>,
177{
178 let mut records_skipped = 0usize;
179 while records_skipped < batch_size {
180 let records_to_read = batch_size - records_skipped;
181
182 let records_skipped_once = record_reader.skip_records(records_to_read)?;
183 records_skipped += records_skipped_once;
184
185 if records_skipped_once < records_to_read {
187 if let Some(page_reader) = pages.next() {
188 record_reader.set_page_reader(page_reader?)?;
190 } else {
191 break;
193 }
194 }
195 }
196 Ok(records_skipped)
197}