parquet/arrow/array_reader/
mod.rs1use crate::errors::Result;
21use arrow_array::ArrayRef;
22use arrow_schema::DataType as ArrowType;
23use std::any::Any;
24use std::sync::Arc;
25
26use crate::arrow::record_reader::buffer::ValuesBuffer;
27use crate::arrow::record_reader::GenericRecordReader;
28use crate::column::page::PageIterator;
29use crate::column::reader::decoder::ColumnValueDecoder;
30use crate::file::reader::{FilePageIterator, FileReader};
31
32mod builder;
33mod byte_array;
34mod byte_array_dictionary;
35mod byte_view_array;
36mod empty_array;
37mod fixed_len_byte_array;
38mod fixed_size_list_array;
39mod list_array;
40mod map_array;
41mod null_array;
42mod primitive_array;
43mod struct_array;
44
45#[cfg(test)]
46mod test_util;
47
48pub use builder::build_array_reader;
49pub use byte_array::make_byte_array_reader;
50pub use byte_array_dictionary::make_byte_array_dictionary_reader;
51#[allow(unused_imports)] pub use byte_view_array::make_byte_view_array_reader;
53#[allow(unused_imports)] pub use fixed_len_byte_array::make_fixed_len_byte_array_reader;
55pub use fixed_size_list_array::FixedSizeListArrayReader;
56pub use list_array::ListArrayReader;
57pub use map_array::MapArrayReader;
58pub use null_array::NullArrayReader;
59pub use primitive_array::PrimitiveArrayReader;
60pub use struct_array::StructArrayReader;
61
62pub trait ArrayReader: Send {
64 #[allow(dead_code)]
67 fn as_any(&self) -> &dyn Any;
68
69 fn get_data_type(&self) -> &ArrowType;
71
72 #[cfg(any(feature = "experimental", test))]
74 fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
75 self.read_records(batch_size)?;
76 self.consume_batch()
77 }
78
79 fn read_records(&mut self, batch_size: usize) -> Result<usize>;
84
85 fn consume_batch(&mut self) -> Result<ArrayRef>;
88
89 fn skip_records(&mut self, num_records: usize) -> Result<usize>;
91
92 fn get_def_levels(&self) -> Option<&[i16]>;
99
100 fn get_rep_levels(&self) -> Option<&[i16]>;
107}
108
109pub trait RowGroups {
111 fn num_rows(&self) -> usize;
113
114 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
116}
117
118impl RowGroups for Arc<dyn FileReader> {
119 fn num_rows(&self) -> usize {
120 self.metadata().file_metadata().num_rows() as usize
121 }
122
123 fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
124 let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
125 Ok(Box::new(iterator))
126 }
127}
128
129fn read_records<V, CV>(
134 record_reader: &mut GenericRecordReader<V, CV>,
135 pages: &mut dyn PageIterator,
136 batch_size: usize,
137) -> Result<usize>
138where
139 V: ValuesBuffer,
140 CV: ColumnValueDecoder<Buffer = V>,
141{
142 let mut records_read = 0usize;
143 while records_read < batch_size {
144 let records_to_read = batch_size - records_read;
145
146 let records_read_once = record_reader.read_records(records_to_read)?;
147 records_read += records_read_once;
148
149 if records_read_once < records_to_read {
151 if let Some(page_reader) = pages.next() {
152 record_reader.set_page_reader(page_reader?)?;
154 } else {
155 break;
157 }
158 }
159 }
160 Ok(records_read)
161}
162
163fn skip_records<V, CV>(
168 record_reader: &mut GenericRecordReader<V, CV>,
169 pages: &mut dyn PageIterator,
170 batch_size: usize,
171) -> Result<usize>
172where
173 V: ValuesBuffer,
174 CV: ColumnValueDecoder<Buffer = V>,
175{
176 let mut records_skipped = 0usize;
177 while records_skipped < batch_size {
178 let records_to_read = batch_size - records_skipped;
179
180 let records_skipped_once = record_reader.skip_records(records_to_read)?;
181 records_skipped += records_skipped_once;
182
183 if records_skipped_once < records_to_read {
185 if let Some(page_reader) = pages.next() {
186 record_reader.set_page_reader(page_reader?)?;
188 } else {
189 break;
191 }
192 }
193 }
194 Ok(records_skipped)
195}