parquet/arrow/array_reader/
mod.rs1use crate::errors::Result;
21use arrow_array::ArrayRef;
22use arrow_schema::DataType as ArrowType;
23use std::any::Any;
24use std::sync::Arc;
25
26use crate::arrow::record_reader::buffer::ValuesBuffer;
27use crate::arrow::record_reader::GenericRecordReader;
28use crate::column::page::PageIterator;
29use crate::column::reader::decoder::ColumnValueDecoder;
30use crate::file::reader::{FilePageIterator, FileReader};
31
32mod builder;
33mod byte_array;
34mod byte_array_dictionary;
35mod byte_view_array;
36mod empty_array;
37mod fixed_len_byte_array;
38mod fixed_size_list_array;
39mod list_array;
40mod map_array;
41mod null_array;
42mod primitive_array;
43mod struct_array;
44
45#[cfg(test)]
46mod test_util;
47
48pub(crate) use builder::ArrayReaderBuilder;
49pub use byte_array::make_byte_array_reader;
50pub use byte_array_dictionary::make_byte_array_dictionary_reader;
51#[allow(unused_imports)] pub use byte_view_array::make_byte_view_array_reader;
53#[allow(unused_imports)] pub use fixed_len_byte_array::make_fixed_len_byte_array_reader;
55pub use fixed_size_list_array::FixedSizeListArrayReader;
56pub use list_array::ListArrayReader;
57pub use map_array::MapArrayReader;
58pub use null_array::NullArrayReader;
59pub use primitive_array::PrimitiveArrayReader;
60pub use struct_array::StructArrayReader;
61
62pub trait ArrayReader: Send {
64 #[allow(dead_code)]
67 fn as_any(&self) -> &dyn Any;
68
69 fn get_data_type(&self) -> &ArrowType;
71
72 #[cfg(any(feature = "experimental", test))]
74 fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
75 self.read_records(batch_size)?;
76 self.consume_batch()
77 }
78
79 fn read_records(&mut self, batch_size: usize) -> Result<usize>;
84
85 fn consume_batch(&mut self) -> Result<ArrayRef>;
88
89 fn skip_records(&mut self, num_records: usize) -> Result<usize>;
91
92 fn get_def_levels(&self) -> Option<&[i16]>;
99
100 fn get_rep_levels(&self) -> Option<&[i16]>;
107}
108
109pub trait RowGroups {
111 fn num_rows(&self) -> usize;
113
114 fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>>;
117}
118
119impl RowGroups for Arc<dyn FileReader> {
120 fn num_rows(&self) -> usize {
121 self.metadata().file_metadata().num_rows() as usize
122 }
123
124 fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
125 let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
126 Ok(Box::new(iterator))
127 }
128}
129
130fn read_records<V, CV>(
135 record_reader: &mut GenericRecordReader<V, CV>,
136 pages: &mut dyn PageIterator,
137 batch_size: usize,
138) -> Result<usize>
139where
140 V: ValuesBuffer,
141 CV: ColumnValueDecoder<Buffer = V>,
142{
143 let mut records_read = 0usize;
144 while records_read < batch_size {
145 let records_to_read = batch_size - records_read;
146
147 let records_read_once = record_reader.read_records(records_to_read)?;
148 records_read += records_read_once;
149
150 if records_read_once < records_to_read {
152 if let Some(page_reader) = pages.next() {
153 record_reader.set_page_reader(page_reader?)?;
155 } else {
156 break;
158 }
159 }
160 }
161 Ok(records_read)
162}
163
164fn skip_records<V, CV>(
169 record_reader: &mut GenericRecordReader<V, CV>,
170 pages: &mut dyn PageIterator,
171 batch_size: usize,
172) -> Result<usize>
173where
174 V: ValuesBuffer,
175 CV: ColumnValueDecoder<Buffer = V>,
176{
177 let mut records_skipped = 0usize;
178 while records_skipped < batch_size {
179 let records_to_read = batch_size - records_skipped;
180
181 let records_skipped_once = record_reader.skip_records(records_to_read)?;
182 records_skipped += records_skipped_once;
183
184 if records_skipped_once < records_to_read {
186 if let Some(page_reader) = pages.next() {
187 record_reader.set_page_reader(page_reader?)?;
189 } else {
190 break;
192 }
193 }
194 }
195 Ok(records_skipped)
196}