parquet/arrow/array_reader/
map_array.rs1use crate::arrow::array_reader::{ArrayReader, ListArrayReader, StructArrayReader};
19use crate::errors::Result;
20use arrow_array::{Array, ArrayRef, MapArray};
21use arrow_schema::DataType as ArrowType;
22use std::any::Any;
23use std::sync::Arc;
24
25pub struct MapArrayReader {
27 data_type: ArrowType,
28 reader: ListArrayReader<i32>,
29}
30
31impl MapArrayReader {
32 pub fn new(
35 key_reader: Box<dyn ArrayReader>,
36 value_reader: Box<dyn ArrayReader>,
37 data_type: ArrowType,
38 def_level: i16,
39 rep_level: i16,
40 nullable: bool,
41 ) -> Self {
42 let struct_def_level = match nullable {
43 true => def_level + 2,
44 false => def_level + 1,
45 };
46 let struct_rep_level = rep_level + 1;
47
48 let element = match &data_type {
49 ArrowType::Map(element, _) => match element.data_type() {
50 ArrowType::Struct(fields) if fields.len() == 2 => {
51 assert!(!element.is_nullable(), "map struct cannot be nullable");
55 element
56 }
57 _ => unreachable!("expected struct with two fields"),
58 },
59 _ => unreachable!("expected map type"),
60 };
61
62 let struct_reader = StructArrayReader::new(
63 element.data_type().clone(),
64 vec![key_reader, value_reader],
65 struct_def_level,
66 struct_rep_level,
67 false,
68 );
69
70 let reader = ListArrayReader::new(
71 Box::new(struct_reader),
72 ArrowType::List(element.clone()),
73 def_level,
74 rep_level,
75 nullable,
76 );
77
78 Self { data_type, reader }
79 }
80}
81
82impl ArrayReader for MapArrayReader {
83 fn as_any(&self) -> &dyn Any {
84 self
85 }
86
87 fn get_data_type(&self) -> &ArrowType {
88 &self.data_type
89 }
90
91 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
92 self.reader.read_records(batch_size)
93 }
94
95 fn consume_batch(&mut self) -> Result<ArrayRef> {
96 let array = self.reader.consume_batch().unwrap();
99 let data = array.to_data();
100 let builder = data.into_builder().data_type(self.data_type.clone());
101
102 Ok(Arc::new(MapArray::from(unsafe {
106 builder.build_unchecked()
107 })))
108 }
109
110 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
111 self.reader.skip_records(num_records)
112 }
113
114 fn get_def_levels(&self) -> Option<&[i16]> {
115 self.reader.get_def_levels()
116 }
117
118 fn get_rep_levels(&self) -> Option<&[i16]> {
119 self.reader.get_rep_levels()
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use crate::arrow::arrow_reader::ParquetRecordBatchReader;
127 use crate::arrow::ArrowWriter;
128 use arrow::datatypes::{Field, Int32Type, Schema};
129 use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
130 use arrow_array::cast::*;
131 use arrow_array::RecordBatch;
132 use arrow_schema::Fields;
133 use bytes::Bytes;
134
135 #[test]
136 fn read_map_array_column() {
148 let schema = Schema::new(vec![Field::new(
150 "map",
151 ArrowType::Map(
152 Arc::new(Field::new(
153 "entries",
154 ArrowType::Struct(Fields::from(vec![
155 Field::new("keys", ArrowType::Utf8, false),
156 Field::new("values", ArrowType::Int32, true),
157 ])),
158 false,
159 )),
160 false, ),
162 true,
163 )]);
164
165 let string_builder = StringBuilder::new();
167 let ints_builder: PrimitiveBuilder<Int32Type> = PrimitiveBuilder::new();
168 let mut map_builder = MapBuilder::new(None, string_builder, ints_builder);
169
170 map_builder.append(false).expect("adding null map entry");
172 map_builder.append(false).expect("adding null map entry");
173 map_builder.keys().append_value("three");
174 map_builder.keys().append_value("four");
175 map_builder.keys().append_value("five");
176 map_builder.keys().append_value("six");
177 map_builder.keys().append_value("seven");
178
179 map_builder.values().append_value(3);
180 map_builder.values().append_value(4);
181 map_builder.values().append_value(5);
182 map_builder.values().append_value(6);
183 map_builder.values().append_value(7);
184 map_builder.append(true).expect("adding map entry");
185
186 let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(map_builder.finish())])
188 .expect("create record batch");
189
190 let mut buffer = Vec::with_capacity(1024);
192 let mut writer =
193 ArrowWriter::try_new(&mut buffer, batch.schema(), None).expect("creat file writer");
194 writer.write(&batch).expect("writing file");
195 writer.close().expect("close writer");
196
197 let reader = Bytes::from(buffer);
199 let record_batch_reader = ParquetRecordBatchReader::try_new(reader, 1024).unwrap();
200 for maybe_record_batch in record_batch_reader {
201 let record_batch = maybe_record_batch.expect("Getting current batch");
202 let col = record_batch.column(0);
203 assert!(col.is_null(0));
204 assert!(col.is_null(1));
205 let map_entry = as_map_array(col).value(2);
206 let struct_col = as_struct_array(&map_entry);
207 let key_col = as_string_array(struct_col.column(0)); assert_eq!(key_col.value(0), "three");
209 assert_eq!(key_col.value(1), "four");
210 assert_eq!(key_col.value(2), "five");
211 assert_eq!(key_col.value(3), "six");
212 assert_eq!(key_col.value(4), "seven");
213 }
214 }
215}