arrow_json/reader/
map_array.rs1use std::sync::Arc;
19
20use arrow_array::builder::{BooleanBufferBuilder, BufferBuilder};
21use arrow_array::{ArrayRef, MapArray, StructArray};
22use arrow_buffer::buffer::NullBuffer;
23use arrow_buffer::{ArrowNativeType, OffsetBuffer, ScalarBuffer};
24use arrow_schema::{ArrowError, DataType, FieldRef, Fields};
25
26use crate::reader::tape::{Tape, TapeElement};
27use crate::reader::{ArrayDecoder, DecoderContext};
28
29pub struct MapArrayDecoder {
30 entries_field: FieldRef,
31 key_value_fields: Fields,
32 ordered: bool,
33 keys: Box<dyn ArrayDecoder>,
34 values: Box<dyn ArrayDecoder>,
35 ignore_type_conflicts: bool,
36 is_nullable: bool,
37}
38
39impl MapArrayDecoder {
40 pub fn new(
41 ctx: &DecoderContext,
42 data_type: &DataType,
43 is_nullable: bool,
44 ) -> Result<Self, ArrowError> {
45 let (entries_field, ordered) = match data_type {
46 DataType::Map(_, true) => {
47 return Err(ArrowError::NotYetImplemented(
48 "Decoding MapArray with sorted fields".to_string(),
49 ));
50 }
51 DataType::Map(f, ordered) => (f.clone(), *ordered),
52 _ => unreachable!(),
53 };
54
55 let key_value_fields = match entries_field.data_type() {
56 DataType::Struct(fields) if fields.len() == 2 => fields.clone(),
57 d => {
58 return Err(ArrowError::InvalidArgumentError(format!(
59 "MapArray must contain struct with two fields, got {d}"
60 )));
61 }
62 };
63
64 let keys = ctx.make_decoder(
65 key_value_fields[0].data_type(),
66 key_value_fields[0].is_nullable(),
67 )?;
68 let values = ctx.make_decoder(
69 key_value_fields[1].data_type(),
70 key_value_fields[1].is_nullable(),
71 )?;
72
73 Ok(Self {
74 entries_field,
75 key_value_fields,
76 ordered,
77 keys,
78 values,
79 ignore_type_conflicts: ctx.ignore_type_conflicts(),
80 is_nullable,
81 })
82 }
83}
84
85impl ArrayDecoder for MapArrayDecoder {
86 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
87 let mut offsets = BufferBuilder::<i32>::new(pos.len() + 1);
88 offsets.append(0);
89
90 let mut key_pos = Vec::with_capacity(pos.len());
91 let mut value_pos = Vec::with_capacity(pos.len());
92
93 let mut nulls = self
94 .is_nullable
95 .then(|| BooleanBufferBuilder::new(pos.len()));
96
97 for p in pos.iter().copied() {
98 let end_idx = match (tape.get(p), nulls.as_mut()) {
99 (TapeElement::StartObject(end_idx), None) => end_idx,
100 (TapeElement::StartObject(end_idx), Some(nulls)) => {
101 nulls.append(true);
102 end_idx
103 }
104 (TapeElement::Null, Some(nulls)) => {
105 nulls.append(false);
106 p + 1
107 }
108 (_, Some(nulls)) if self.ignore_type_conflicts => {
109 nulls.append(false);
110 p + 1
111 }
112 _ => return Err(tape.error(p, "{")),
113 };
114
115 let mut cur_idx = p + 1;
116 while cur_idx < end_idx {
117 let key = cur_idx;
118 let value = tape.next(key, "map key")?;
119 cur_idx = tape.next(value, "map value")?;
120
121 key_pos.push(key);
122 value_pos.push(value);
123 }
124
125 let offset = i32::from_usize(key_pos.len()).ok_or_else(|| {
126 ArrowError::JsonError("offset overflow decoding MapArray".to_string())
127 })?;
128 offsets.append(offset)
129 }
130
131 assert_eq!(key_pos.len(), value_pos.len());
132
133 let key_array = self.keys.decode(tape, &key_pos)?;
134 let value_array = self.values.decode(tape, &value_pos)?;
135
136 let entries = unsafe {
138 StructArray::new_unchecked_with_length(
139 self.key_value_fields.clone(),
140 vec![key_array, value_array],
141 None,
142 key_pos.len(),
143 )
144 };
145
146 let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish()));
147 let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets.finish())) };
149
150 let array = MapArray::try_new(
151 self.entries_field.clone(),
152 offsets,
153 entries,
154 nulls,
155 self.ordered,
156 )?;
157 Ok(Arc::new(array))
158 }
159}