1use std::marker::PhantomData;
19use std::sync::Arc;
20
21use arrow_array::{
22 ArrayRef, FixedSizeListArray, GenericListArray, GenericListViewArray, OffsetSizeTrait,
23};
24use arrow_buffer::{NullBufferBuilder, OffsetBuffer, ScalarBuffer};
25use arrow_schema::{ArrowError, DataType, FieldRef};
26
27use crate::reader::tape::{Tape, TapeElement};
28use crate::reader::{ArrayDecoder, DecoderContext};
29
30pub type ListArrayDecoder<O> = ListLikeArrayDecoder<O, false>;
31pub type ListViewArrayDecoder<O> = ListLikeArrayDecoder<O, true>;
32
33pub struct ListLikeArrayDecoder<O, const IS_VIEW: bool> {
34 field: FieldRef,
35 decoder: Box<dyn ArrayDecoder>,
36 phantom: PhantomData<O>,
37 ignore_type_conflicts: bool,
38 is_nullable: bool,
39}
40
41impl<O: OffsetSizeTrait, const IS_VIEW: bool> ListLikeArrayDecoder<O, IS_VIEW> {
42 pub fn new(
43 ctx: &DecoderContext,
44 data_type: &DataType,
45 is_nullable: bool,
46 ) -> Result<Self, ArrowError> {
47 let field = match (IS_VIEW, data_type) {
48 (false, DataType::List(f)) if !O::IS_LARGE => f,
49 (false, DataType::LargeList(f)) if O::IS_LARGE => f,
50 (true, DataType::ListView(f)) if !O::IS_LARGE => f,
51 (true, DataType::LargeListView(f)) if O::IS_LARGE => f,
52 _ => unreachable!(),
53 };
54 let decoder = ctx.make_decoder(field.data_type(), field.is_nullable())?;
55
56 Ok(Self {
57 field: field.clone(),
58 decoder,
59 phantom: Default::default(),
60 ignore_type_conflicts: ctx.ignore_type_conflicts(),
61 is_nullable,
62 })
63 }
64}
65
66impl<O: OffsetSizeTrait, const IS_VIEW: bool> ArrayDecoder for ListLikeArrayDecoder<O, IS_VIEW> {
67 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
68 let mut child_pos = Vec::with_capacity(pos.len());
69 let mut offsets = Vec::with_capacity(pos.len() + 1);
70 offsets.push(O::from_usize(0).unwrap());
71
72 let mut nulls = self.is_nullable.then(|| NullBufferBuilder::new(pos.len()));
73
74 for p in pos {
75 let end_idx = match (tape.get(*p), nulls.as_mut()) {
76 (TapeElement::StartList(end_idx), None) => end_idx,
77 (TapeElement::StartList(end_idx), Some(nulls)) => {
78 nulls.append_non_null();
79 end_idx
80 }
81 (TapeElement::Null, Some(nulls)) => {
82 nulls.append_null();
83 *p + 1
84 }
85 (_, Some(nulls)) if self.ignore_type_conflicts => {
86 nulls.append_null();
87 *p + 1
88 }
89 _ => return Err(tape.error(*p, "[")),
90 };
91
92 let mut cur_idx = *p + 1;
93 while cur_idx < end_idx {
94 child_pos.push(cur_idx);
95
96 cur_idx = tape.next(cur_idx, "list value")?;
98 }
99
100 let offset = O::from_usize(child_pos.len()).ok_or_else(|| {
101 ArrowError::JsonError(format!("offset overflow decoding {}ListArray", O::PREFIX))
102 })?;
103 offsets.push(offset);
104 }
105
106 let values = self.decoder.decode(tape, &child_pos)?;
107 let nulls = nulls.as_mut().and_then(|x| x.finish());
108
109 if IS_VIEW {
110 let mut sizes = Vec::with_capacity(offsets.len() - 1);
111 for i in 1..offsets.len() {
112 sizes.push(offsets[i] - offsets[i - 1]);
113 }
114 offsets.pop();
115 let array = unsafe {
117 GenericListViewArray::<O>::new_unchecked(
118 self.field.clone(),
119 ScalarBuffer::from(offsets),
120 ScalarBuffer::from(sizes),
121 values,
122 nulls,
123 )
124 };
125 Ok(Arc::new(array))
126 } else {
127 let offsets = unsafe { OffsetBuffer::<O>::new_unchecked(ScalarBuffer::from(offsets)) };
129
130 let array = GenericListArray::<O>::try_new(self.field.clone(), offsets, values, nulls)?;
131 Ok(Arc::new(array))
132 }
133 }
134}
135
136pub struct FixedSizeListArrayDecoder {
137 field: FieldRef,
138 size: i32,
139 decoder: Box<dyn ArrayDecoder>,
140 ignore_type_conflicts: bool,
141 is_nullable: bool,
142}
143
144impl FixedSizeListArrayDecoder {
145 pub fn new(
146 ctx: &DecoderContext,
147 data_type: &DataType,
148 is_nullable: bool,
149 ) -> Result<Self, ArrowError> {
150 let (field, size) = match data_type {
151 DataType::FixedSizeList(f, s) => (f, *s),
152 _ => unreachable!(),
153 };
154 let decoder = ctx.make_decoder(field.data_type(), field.is_nullable())?;
155
156 Ok(Self {
157 field: field.clone(),
158 size,
159 decoder,
160 ignore_type_conflicts: ctx.ignore_type_conflicts(),
161 is_nullable,
162 })
163 }
164}
165
166impl ArrayDecoder for FixedSizeListArrayDecoder {
167 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
168 let expected = self.size as usize;
169 let mut child_pos = Vec::with_capacity(pos.len() * expected);
170
171 let mut nulls = self.is_nullable.then(|| NullBufferBuilder::new(pos.len()));
172
173 for p in pos {
174 let end_idx = match (tape.get(*p), nulls.as_mut()) {
175 (TapeElement::StartList(end_idx), None) => end_idx,
176 (TapeElement::StartList(end_idx), Some(nulls)) => {
177 nulls.append_non_null();
178 end_idx
179 }
180 (TapeElement::Null, Some(nulls)) => {
181 nulls.append_null();
182 child_pos.resize(child_pos.len() + expected, 0);
183 continue;
184 }
185 (_, Some(nulls)) if self.ignore_type_conflicts => {
186 nulls.append_null();
187 child_pos.resize(child_pos.len() + expected, 0);
188 continue;
189 }
190 _ => return Err(tape.error(*p, "[")),
191 };
192
193 let child_start = child_pos.len();
194 let mut cur_idx = *p + 1;
195 while cur_idx < end_idx {
196 child_pos.push(cur_idx);
197 cur_idx = tape.next(cur_idx, "fixed-size list value")?;
198 }
199
200 let actual = child_pos.len() - child_start;
201 if actual != expected {
202 return Err(ArrowError::JsonError(format!(
203 "Incorrect number of elements for FixedSizeList, \
204 expected {expected} but got {actual}"
205 )));
206 }
207 }
208
209 let values = self.decoder.decode(tape, &child_pos)?;
210 let nulls = nulls.as_mut().and_then(|x| x.finish());
211
212 let array = FixedSizeListArray::try_new_with_length(
213 self.field.clone(),
214 self.size,
215 values,
216 nulls,
217 pos.len(),
218 )?;
219 Ok(Arc::new(array))
220 }
221}