parquet/arrow/array_reader/
null_array.rs1use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::record_reader::RecordReader;
20use crate::column::page::PageIterator;
21use crate::data_type::DataType;
22use crate::errors::Result;
23use crate::schema::types::ColumnDescPtr;
24use arrow_array::ArrayRef;
25use arrow_buffer::ArrowNativeType;
26use arrow_schema::DataType as ArrowType;
27use std::any::Any;
28use std::sync::Arc;
29
30pub struct NullArrayReader<T>
33where
34 T: DataType,
35 T::T: ArrowNativeType,
36{
37 data_type: ArrowType,
38 pages: Box<dyn PageIterator>,
39 def_levels_buffer: Option<Vec<i16>>,
40 rep_levels_buffer: Option<Vec<i16>>,
41 record_reader: RecordReader<T>,
42}
43
44impl<T> NullArrayReader<T>
45where
46 T: DataType,
47 T::T: ArrowNativeType,
48{
49 pub fn new(pages: Box<dyn PageIterator>, column_desc: ColumnDescPtr) -> Result<Self> {
51 let record_reader = RecordReader::<T>::new(column_desc);
52
53 Ok(Self {
54 data_type: ArrowType::Null,
55 pages,
56 def_levels_buffer: None,
57 rep_levels_buffer: None,
58 record_reader,
59 })
60 }
61}
62
63impl<T> ArrayReader for NullArrayReader<T>
65where
66 T: DataType,
67 T::T: ArrowNativeType,
68{
69 fn as_any(&self) -> &dyn Any {
70 self
71 }
72
73 fn get_data_type(&self) -> &ArrowType {
75 &self.data_type
76 }
77
78 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
79 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
80 }
81
82 fn consume_batch(&mut self) -> Result<ArrayRef> {
83 let array = arrow_array::NullArray::new(self.record_reader.num_values());
85
86 self.def_levels_buffer = self.record_reader.consume_def_levels();
88 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
89
90 self.record_reader.consume_bitmap_buffer();
92
93 self.record_reader.reset();
94 Ok(Arc::new(array))
95 }
96
97 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
98 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
99 }
100
101 fn get_def_levels(&self) -> Option<&[i16]> {
102 self.def_levels_buffer.as_deref()
103 }
104
105 fn get_rep_levels(&self) -> Option<&[i16]> {
106 self.rep_levels_buffer.as_deref()
107 }
108}