Skip to main content

arrow_json/reader/
map_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
21use arrow_array::{ArrayRef, MapArray, StructArray};
22use arrow_buffer::buffer::NullBuffer;
23use arrow_buffer::{ArrowNativeType, OffsetBuffer, ScalarBuffer};
24use arrow_schema::{ArrowError, DataType, FieldRef, Fields};
25
26use crate::reader::tape::{Tape, TapeElement};
27use crate::reader::{ArrayDecoder, DecoderContext};
28
29pub struct MapArrayDecoder {
30    entries_field: FieldRef,
31    key_value_fields: Fields,
32    ordered: bool,
33    keys: Box<dyn ArrayDecoder>,
34    values: Box<dyn ArrayDecoder>,
35    ignore_type_conflicts: bool,
36    is_nullable: bool,
37}
38
39impl MapArrayDecoder {
40    pub fn new(
41        ctx: &DecoderContext,
42        data_type: &DataType,
43        is_nullable: bool,
44    ) -> Result<Self, ArrowError> {
45        let (entries_field, ordered) = match data_type {
46            DataType::Map(_, true) => {
47                return Err(ArrowError::NotYetImplemented(
48                    "Decoding MapArray with sorted fields".to_string(),
49                ));
50            }
51            DataType::Map(f, ordered) => (f.clone(), *ordered),
52            _ => unreachable!(),
53        };
54
55        let key_value_fields = match entries_field.data_type() {
56            DataType::Struct(fields) if fields.len() == 2 => fields.clone(),
57            d => {
58                return Err(ArrowError::InvalidArgumentError(format!(
59                    "MapArray must contain struct with two fields, got {d}"
60                )));
61            }
62        };
63
64        let keys = ctx.make_decoder(
65            key_value_fields[0].data_type(),
66            key_value_fields[0].is_nullable(),
67        )?;
68        let values = ctx.make_decoder(
69            key_value_fields[1].data_type(),
70            key_value_fields[1].is_nullable(),
71        )?;
72
73        Ok(Self {
74            entries_field,
75            key_value_fields,
76            ordered,
77            keys,
78            values,
79            ignore_type_conflicts: ctx.ignore_type_conflicts(),
80            is_nullable,
81        })
82    }
83}
84
85impl ArrayDecoder for MapArrayDecoder {
86    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
87        let mut offsets = BufferBuilder::<i32>::new(pos.len() + 1);
88        offsets.append(0);
89
90        let mut key_pos = Vec::with_capacity(pos.len());
91        let mut value_pos = Vec::with_capacity(pos.len());
92
93        let mut nulls = self
94            .is_nullable
95            .then(|| BooleanBufferBuilder::new(pos.len()));
96
97        for p in pos.iter().copied() {
98            let end_idx = match (tape.get(p), nulls.as_mut()) {
99                (TapeElement::StartObject(end_idx), None) => end_idx,
100                (TapeElement::StartObject(end_idx), Some(nulls)) => {
101                    nulls.append(true);
102                    end_idx
103                }
104                (TapeElement::Null, Some(nulls)) => {
105                    nulls.append(false);
106                    p + 1
107                }
108                (_, Some(nulls)) if self.ignore_type_conflicts => {
109                    nulls.append(false);
110                    p + 1
111                }
112                _ => return Err(tape.error(p, "{")),
113            };
114
115            let mut cur_idx = p + 1;
116            while cur_idx < end_idx {
117                let key = cur_idx;
118                let value = tape.next(key, "map key")?;
119                cur_idx = tape.next(value, "map value")?;
120
121                key_pos.push(key);
122                value_pos.push(value);
123            }
124
125            let offset = i32::from_usize(key_pos.len()).ok_or_else(|| {
126                ArrowError::JsonError("offset overflow decoding MapArray".to_string())
127            })?;
128            offsets.append(offset)
129        }
130
131        assert_eq!(key_pos.len(), value_pos.len());
132
133        let key_array = self.keys.decode(tape, &key_pos)?;
134        let value_array = self.values.decode(tape, &value_pos)?;
135
136        // SAFETY: fields/arrays match the schema, lengths are equal, no nulls
137        let entries = unsafe {
138            StructArray::new_unchecked_with_length(
139                self.key_value_fields.clone(),
140                vec![key_array, value_array],
141                None,
142                key_pos.len(),
143            )
144        };
145
146        let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish()));
147        // SAFETY: offsets are built monotonically starting from 0
148        let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets.finish())) };
149
150        let array = MapArray::try_new(
151            self.entries_field.clone(),
152            offsets,
153            entries,
154            nulls,
155            self.ordered,
156        )?;
157        Ok(Arc::new(array))
158    }
159}