parquet/arrow/array_reader/
struct_array.rs1use crate::arrow::array_reader::ArrayReader;
19use crate::errors::{ParquetError, Result};
20use arrow_array::{Array, ArrayRef, StructArray, builder::BooleanBufferBuilder};
21use arrow_buffer::NullBuffer;
22use arrow_schema::{DataType as ArrowType, DataType};
23use std::any::Any;
24use std::sync::Arc;
25
26pub struct StructArrayReader {
28 children: Vec<Box<dyn ArrayReader>>,
29 data_type: ArrowType,
30 struct_def_level: i16,
31 struct_rep_level: i16,
32 nullable: bool,
33}
34
35impl StructArrayReader {
36 pub fn new(
38 data_type: ArrowType,
39 children: Vec<Box<dyn ArrayReader>>,
40 def_level: i16,
41 rep_level: i16,
42 nullable: bool,
43 ) -> Self {
44 Self {
45 data_type,
46 children,
47 struct_def_level: def_level,
48 struct_rep_level: rep_level,
49 nullable,
50 }
51 }
52}
53
54impl ArrayReader for StructArrayReader {
55 fn as_any(&self) -> &dyn Any {
56 self
57 }
58
59 fn get_data_type(&self) -> &ArrowType {
62 &self.data_type
63 }
64
65 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
66 let mut read = None;
67 for child in self.children.iter_mut() {
68 let child_read = child.read_records(batch_size)?;
69 match read {
70 Some(expected) => {
71 if expected != child_read {
72 return Err(general_err!(
73 "StructArrayReader out of sync in read_records, expected {} read, got {}",
74 expected,
75 child_read
76 ));
77 }
78 }
79 None => read = Some(child_read),
80 }
81 }
82 Ok(read.unwrap_or(0))
83 }
84
85 fn consume_batch(&mut self) -> Result<ArrayRef> {
104 if self.children.is_empty() {
105 return Ok(Arc::new(StructArray::from(Vec::new())));
106 }
107
108 let children_array = self
109 .children
110 .iter_mut()
111 .map(|reader| reader.consume_batch())
112 .collect::<Result<Vec<_>>>()?;
113
114 let children_array_len = children_array
116 .first()
117 .map(|arr| arr.len())
118 .ok_or_else(|| general_err!("Struct array reader should have at least one child!"))?;
119
120 let all_children_len_eq = children_array
121 .iter()
122 .all(|arr| arr.len() == children_array_len);
123 if !all_children_len_eq {
124 return Err(general_err!("Not all children array length are the same!"));
125 }
126
127 let DataType::Struct(fields) = &self.data_type else {
128 return Err(general_err!(
129 "Internal: StructArrayReader must have struct data type, got {:?}",
130 self.data_type
131 ));
132 };
133 let fields = fields.clone(); let mut nulls = None;
136 if self.nullable {
137 let def_levels = self.children[0]
141 .get_def_levels()
142 .expect("child with nullable parents must have definition level");
143
144 let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len);
146
147 match self.children[0].get_rep_levels() {
148 Some(rep_levels) => {
149 assert_eq!(rep_levels.len(), def_levels.len());
151
152 for (rep_level, def_level) in rep_levels.iter().zip(def_levels) {
153 if rep_level > &self.struct_rep_level {
154 continue;
156 }
157 bitmap_builder.append(*def_level >= self.struct_def_level)
158 }
159 }
160 None => {
161 unsafe {
163 bitmap_builder.extend_trusted_len(
164 def_levels
165 .iter()
166 .map(|level| *level >= self.struct_def_level),
167 )
168 }
169 }
170 }
171
172 if bitmap_builder.len() != children_array_len {
173 return Err(general_err!("Failed to decode level data for struct array"));
174 }
175 nulls = Some(NullBuffer::from(bitmap_builder));
176 }
177
178 unsafe {
181 Ok(Arc::new(StructArray::new_unchecked_with_length(
182 fields,
183 children_array,
184 nulls,
185 children_array_len,
186 )))
187 }
188 }
189
190 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
191 let mut skipped = None;
192 for child in self.children.iter_mut() {
193 let child_skipped = child.skip_records(num_records)?;
194 match skipped {
195 Some(expected) => {
196 if expected != child_skipped {
197 return Err(general_err!(
198 "StructArrayReader out of sync, expected {} skipped, got {}",
199 expected,
200 child_skipped
201 ));
202 }
203 }
204 None => skipped = Some(child_skipped),
205 }
206 }
207 Ok(skipped.unwrap_or(0))
208 }
209
210 fn get_def_levels(&self) -> Option<&[i16]> {
211 self.children.first().and_then(|l| l.get_def_levels())
214 }
215
216 fn get_rep_levels(&self) -> Option<&[i16]> {
217 self.children.first().and_then(|l| l.get_rep_levels())
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use crate::arrow::array_reader::ListArrayReader;
227 use crate::arrow::array_reader::test_util::InMemoryArrayReader;
228 use arrow::buffer::Buffer;
229 use arrow::datatypes::Field;
230 use arrow_array::cast::AsArray;
231 use arrow_array::{Array, Int32Array, ListArray};
232 use arrow_schema::Fields;
233
234 #[test]
235 fn test_struct_array_reader() {
236 let array_1 = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
237 let array_reader_1 = InMemoryArrayReader::new(
238 ArrowType::Int32,
239 array_1.clone(),
240 Some(vec![0, 1, 2, 3, 1]),
241 Some(vec![0, 1, 1, 1, 1]),
242 );
243
244 let array_2 = Arc::new(Int32Array::from(vec![5, 4, 3, 2, 1]));
245 let array_reader_2 = InMemoryArrayReader::new(
246 ArrowType::Int32,
247 array_2.clone(),
248 Some(vec![0, 1, 3, 1, 2]),
249 Some(vec![0, 1, 1, 1, 1]),
250 );
251
252 let struct_type = ArrowType::Struct(Fields::from(vec![
253 Field::new("f1", array_1.data_type().clone(), true),
254 Field::new("f2", array_2.data_type().clone(), true),
255 ]));
256
257 let mut struct_array_reader = StructArrayReader::new(
258 struct_type,
259 vec![Box::new(array_reader_1), Box::new(array_reader_2)],
260 1,
261 1,
262 true,
263 );
264
265 let struct_array = struct_array_reader.next_batch(5).unwrap();
266 let struct_array = struct_array.as_struct();
267
268 assert_eq!(5, struct_array.len());
269 assert_eq!(
270 vec![true, false, false, false, false],
271 (0..5)
272 .map(|idx| struct_array.is_null(idx))
273 .collect::<Vec<bool>>()
274 );
275 assert_eq!(
276 Some(vec![0, 1, 2, 3, 1].as_slice()),
277 struct_array_reader.get_def_levels()
278 );
279 assert_eq!(
280 Some(vec![0, 1, 1, 1, 1].as_slice()),
281 struct_array_reader.get_rep_levels()
282 );
283 }
284
285 #[test]
286 fn test_struct_array_reader_list() {
287 use arrow::datatypes::Int32Type;
288 let expected_l = Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
296 Some(vec![Some(1), Some(2), None]),
297 Some(vec![]),
298 None,
299 None,
300 ]));
301
302 let validity = Buffer::from([0b00000111]);
303 let struct_fields = vec![(
304 Arc::new(Field::new("foo", expected_l.data_type().clone(), true)),
305 expected_l.clone() as ArrayRef,
306 )];
307 let expected = StructArray::from((struct_fields, validity));
308
309 let array = Arc::new(Int32Array::from_iter(vec![
310 Some(1),
311 Some(2),
312 None,
313 None,
314 None,
315 None,
316 ]));
317 let reader = InMemoryArrayReader::new(
318 ArrowType::Int32,
319 array,
320 Some(vec![4, 4, 3, 2, 1, 0]),
321 Some(vec![0, 1, 1, 0, 0, 0]),
322 );
323
324 let list_reader = ListArrayReader::<i32>::new(
325 Box::new(reader),
326 expected_l.data_type().clone(),
327 3,
328 1,
329 true,
330 );
331
332 let mut struct_reader = StructArrayReader::new(
333 expected.data_type().clone(),
334 vec![Box::new(list_reader)],
335 1,
336 0,
337 true,
338 );
339
340 let actual = struct_reader.next_batch(1024).unwrap();
341 let actual = actual.as_struct();
342 assert_eq!(actual, &expected)
343 }
344}