parquet/arrow/array_reader/
map_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{ArrayReader, ListArrayReader, StructArrayReader};
19use crate::errors::Result;
20use arrow_array::{Array, ArrayRef, MapArray};
21use arrow_schema::DataType as ArrowType;
22use std::any::Any;
23use std::sync::Arc;
24
25/// Implementation of a map array reader.
26pub struct MapArrayReader {
27    data_type: ArrowType,
28    reader: ListArrayReader<i32>,
29}
30
31impl MapArrayReader {
32    /// Creates a new [`MapArrayReader`] with a `def_level`, `rep_level` and `nullable`
33    /// as defined on [`ParquetField`][crate::arrow::schema::ParquetField]
34    pub fn new(
35        key_reader: Box<dyn ArrayReader>,
36        value_reader: Box<dyn ArrayReader>,
37        data_type: ArrowType,
38        def_level: i16,
39        rep_level: i16,
40        nullable: bool,
41    ) -> Self {
42        let struct_def_level = match nullable {
43            true => def_level + 2,
44            false => def_level + 1,
45        };
46        let struct_rep_level = rep_level + 1;
47
48        let element = match &data_type {
49            ArrowType::Map(element, _) => match element.data_type() {
50                ArrowType::Struct(fields) if fields.len() == 2 => {
51                    // Parquet cannot represent nullability at this level (#1697)
52                    // and so encountering nullability here indicates some manner
53                    // of schema inconsistency / inference bug
54                    assert!(!element.is_nullable(), "map struct cannot be nullable");
55                    element
56                }
57                _ => unreachable!("expected struct with two fields"),
58            },
59            _ => unreachable!("expected map type"),
60        };
61
62        let struct_reader = StructArrayReader::new(
63            element.data_type().clone(),
64            vec![key_reader, value_reader],
65            struct_def_level,
66            struct_rep_level,
67            false,
68        );
69
70        let reader = ListArrayReader::new(
71            Box::new(struct_reader),
72            ArrowType::List(element.clone()),
73            def_level,
74            rep_level,
75            nullable,
76        );
77
78        Self { data_type, reader }
79    }
80}
81
82impl ArrayReader for MapArrayReader {
83    fn as_any(&self) -> &dyn Any {
84        self
85    }
86
87    fn get_data_type(&self) -> &ArrowType {
88        &self.data_type
89    }
90
91    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
92        self.reader.read_records(batch_size)
93    }
94
95    fn consume_batch(&mut self) -> Result<ArrayRef> {
96        // A MapArray is just a ListArray with a StructArray child
97        // we can therefore just alter the ArrayData
98        let array = self.reader.consume_batch().unwrap();
99        let data = array.to_data();
100        let builder = data.into_builder().data_type(self.data_type.clone());
101
102        // SAFETY - we can assume that ListArrayReader produces valid ListArray
103        // of the expected type, and as such its output can be reinterpreted as
104        // a MapArray without validation
105        Ok(Arc::new(MapArray::from(unsafe {
106            builder.build_unchecked()
107        })))
108    }
109
110    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
111        self.reader.skip_records(num_records)
112    }
113
114    fn get_def_levels(&self) -> Option<&[i16]> {
115        self.reader.get_def_levels()
116    }
117
118    fn get_rep_levels(&self) -> Option<&[i16]> {
119        self.reader.get_rep_levels()
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use crate::arrow::arrow_reader::ParquetRecordBatchReader;
127    use crate::arrow::ArrowWriter;
128    use arrow::datatypes::{Field, Int32Type, Schema};
129    use arrow_array::builder::{MapBuilder, PrimitiveBuilder, StringBuilder};
130    use arrow_array::cast::*;
131    use arrow_array::RecordBatch;
132    use arrow_schema::Fields;
133    use bytes::Bytes;
134
135    #[test]
136    // This test writes a parquet file with the following data:
137    // +--------------------------------------------------------+
138    // |map                                                     |
139    // +--------------------------------------------------------+
140    // |null                                                    |
141    // |null                                                    |
142    // |{three -> 3, four -> 4, five -> 5, six -> 6, seven -> 7}|
143    // +--------------------------------------------------------+
144    //
145    // It then attempts to read the data back and checks that the third record
146    // contains the expected values.
147    fn read_map_array_column() {
148        // Schema for single map of string to int32
149        let schema = Schema::new(vec![Field::new(
150            "map",
151            ArrowType::Map(
152                Arc::new(Field::new(
153                    "entries",
154                    ArrowType::Struct(Fields::from(vec![
155                        Field::new("keys", ArrowType::Utf8, false),
156                        Field::new("values", ArrowType::Int32, true),
157                    ])),
158                    false,
159                )),
160                false, // Map field not sorted
161            ),
162            true,
163        )]);
164
165        // Create builders for map
166        let string_builder = StringBuilder::new();
167        let ints_builder: PrimitiveBuilder<Int32Type> = PrimitiveBuilder::new();
168        let mut map_builder = MapBuilder::new(None, string_builder, ints_builder);
169
170        // Add two null records and one record with five entries
171        map_builder.append(false).expect("adding null map entry");
172        map_builder.append(false).expect("adding null map entry");
173        map_builder.keys().append_value("three");
174        map_builder.keys().append_value("four");
175        map_builder.keys().append_value("five");
176        map_builder.keys().append_value("six");
177        map_builder.keys().append_value("seven");
178
179        map_builder.values().append_value(3);
180        map_builder.values().append_value(4);
181        map_builder.values().append_value(5);
182        map_builder.values().append_value(6);
183        map_builder.values().append_value(7);
184        map_builder.append(true).expect("adding map entry");
185
186        // Create record batch
187        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(map_builder.finish())])
188            .expect("create record batch");
189
190        // Write record batch to file
191        let mut buffer = Vec::with_capacity(1024);
192        let mut writer =
193            ArrowWriter::try_new(&mut buffer, batch.schema(), None).expect("creat file writer");
194        writer.write(&batch).expect("writing file");
195        writer.close().expect("close writer");
196
197        // Read file
198        let reader = Bytes::from(buffer);
199        let record_batch_reader = ParquetRecordBatchReader::try_new(reader, 1024).unwrap();
200        for maybe_record_batch in record_batch_reader {
201            let record_batch = maybe_record_batch.expect("Getting current batch");
202            let col = record_batch.column(0);
203            assert!(col.is_null(0));
204            assert!(col.is_null(1));
205            let map_entry = as_map_array(col).value(2);
206            let struct_col = as_struct_array(&map_entry);
207            let key_col = as_string_array(struct_col.column(0)); // Key column
208            assert_eq!(key_col.value(0), "three");
209            assert_eq!(key_col.value(1), "four");
210            assert_eq!(key_col.value(2), "five");
211            assert_eq!(key_col.value(3), "six");
212            assert_eq!(key_col.value(4), "seven");
213        }
214    }
215}