Skip to main content

arrow_json/reader/
list_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::marker::PhantomData;
19use std::sync::Arc;
20
21use arrow_array::{
22    ArrayRef, FixedSizeListArray, GenericListArray, GenericListViewArray, OffsetSizeTrait,
23};
24use arrow_buffer::{NullBufferBuilder, OffsetBuffer, ScalarBuffer};
25use arrow_schema::{ArrowError, DataType, FieldRef};
26
27use crate::reader::tape::{Tape, TapeElement};
28use crate::reader::{ArrayDecoder, DecoderContext};
29
30pub type ListArrayDecoder<O> = ListLikeArrayDecoder<O, false>;
31pub type ListViewArrayDecoder<O> = ListLikeArrayDecoder<O, true>;
32
33pub struct ListLikeArrayDecoder<O, const IS_VIEW: bool> {
34    field: FieldRef,
35    decoder: Box<dyn ArrayDecoder>,
36    phantom: PhantomData<O>,
37    ignore_type_conflicts: bool,
38    is_nullable: bool,
39}
40
41impl<O: OffsetSizeTrait, const IS_VIEW: bool> ListLikeArrayDecoder<O, IS_VIEW> {
42    pub fn new(
43        ctx: &DecoderContext,
44        data_type: &DataType,
45        is_nullable: bool,
46    ) -> Result<Self, ArrowError> {
47        let field = match (IS_VIEW, data_type) {
48            (false, DataType::List(f)) if !O::IS_LARGE => f,
49            (false, DataType::LargeList(f)) if O::IS_LARGE => f,
50            (true, DataType::ListView(f)) if !O::IS_LARGE => f,
51            (true, DataType::LargeListView(f)) if O::IS_LARGE => f,
52            _ => unreachable!(),
53        };
54        let decoder = ctx.make_decoder(field.data_type(), field.is_nullable())?;
55
56        Ok(Self {
57            field: field.clone(),
58            decoder,
59            phantom: Default::default(),
60            ignore_type_conflicts: ctx.ignore_type_conflicts(),
61            is_nullable,
62        })
63    }
64}
65
66impl<O: OffsetSizeTrait, const IS_VIEW: bool> ArrayDecoder for ListLikeArrayDecoder<O, IS_VIEW> {
67    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
68        let mut child_pos = Vec::with_capacity(pos.len());
69        let mut offsets = Vec::with_capacity(pos.len() + 1);
70        offsets.push(O::from_usize(0).unwrap());
71
72        let mut nulls = self.is_nullable.then(|| NullBufferBuilder::new(pos.len()));
73
74        for p in pos {
75            let end_idx = match (tape.get(*p), nulls.as_mut()) {
76                (TapeElement::StartList(end_idx), None) => end_idx,
77                (TapeElement::StartList(end_idx), Some(nulls)) => {
78                    nulls.append_non_null();
79                    end_idx
80                }
81                (TapeElement::Null, Some(nulls)) => {
82                    nulls.append_null();
83                    *p + 1
84                }
85                (_, Some(nulls)) if self.ignore_type_conflicts => {
86                    nulls.append_null();
87                    *p + 1
88                }
89                _ => return Err(tape.error(*p, "[")),
90            };
91
92            let mut cur_idx = *p + 1;
93            while cur_idx < end_idx {
94                child_pos.push(cur_idx);
95
96                // Advance to next field
97                cur_idx = tape.next(cur_idx, "list value")?;
98            }
99
100            let offset = O::from_usize(child_pos.len()).ok_or_else(|| {
101                ArrowError::JsonError(format!("offset overflow decoding {}ListArray", O::PREFIX))
102            })?;
103            offsets.push(offset);
104        }
105
106        let values = self.decoder.decode(tape, &child_pos)?;
107        let nulls = nulls.as_mut().and_then(|x| x.finish());
108
109        if IS_VIEW {
110            let mut sizes = Vec::with_capacity(offsets.len() - 1);
111            for i in 1..offsets.len() {
112                sizes.push(offsets[i] - offsets[i - 1]);
113            }
114            offsets.pop();
115            // SAFETY: offsets and sizes are constructed correctly from the tape
116            let array = unsafe {
117                GenericListViewArray::<O>::new_unchecked(
118                    self.field.clone(),
119                    ScalarBuffer::from(offsets),
120                    ScalarBuffer::from(sizes),
121                    values,
122                    nulls,
123                )
124            };
125            Ok(Arc::new(array))
126        } else {
127            // SAFETY: offsets are built monotonically starting from 0
128            let offsets = unsafe { OffsetBuffer::<O>::new_unchecked(ScalarBuffer::from(offsets)) };
129
130            let array = GenericListArray::<O>::try_new(self.field.clone(), offsets, values, nulls)?;
131            Ok(Arc::new(array))
132        }
133    }
134}
135
136pub struct FixedSizeListArrayDecoder {
137    field: FieldRef,
138    size: i32,
139    decoder: Box<dyn ArrayDecoder>,
140    ignore_type_conflicts: bool,
141    is_nullable: bool,
142}
143
144impl FixedSizeListArrayDecoder {
145    pub fn new(
146        ctx: &DecoderContext,
147        data_type: &DataType,
148        is_nullable: bool,
149    ) -> Result<Self, ArrowError> {
150        let (field, size) = match data_type {
151            DataType::FixedSizeList(f, s) => (f, *s),
152            _ => unreachable!(),
153        };
154        let decoder = ctx.make_decoder(field.data_type(), field.is_nullable())?;
155
156        Ok(Self {
157            field: field.clone(),
158            size,
159            decoder,
160            ignore_type_conflicts: ctx.ignore_type_conflicts(),
161            is_nullable,
162        })
163    }
164}
165
166impl ArrayDecoder for FixedSizeListArrayDecoder {
167    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
168        let expected = self.size as usize;
169        let mut child_pos = Vec::with_capacity(pos.len() * expected);
170
171        let mut nulls = self.is_nullable.then(|| NullBufferBuilder::new(pos.len()));
172
173        for p in pos {
174            let end_idx = match (tape.get(*p), nulls.as_mut()) {
175                (TapeElement::StartList(end_idx), None) => end_idx,
176                (TapeElement::StartList(end_idx), Some(nulls)) => {
177                    nulls.append_non_null();
178                    end_idx
179                }
180                (TapeElement::Null, Some(nulls)) => {
181                    nulls.append_null();
182                    child_pos.resize(child_pos.len() + expected, 0);
183                    continue;
184                }
185                (_, Some(nulls)) if self.ignore_type_conflicts => {
186                    nulls.append_null();
187                    child_pos.resize(child_pos.len() + expected, 0);
188                    continue;
189                }
190                _ => return Err(tape.error(*p, "[")),
191            };
192
193            let child_start = child_pos.len();
194            let mut cur_idx = *p + 1;
195            while cur_idx < end_idx {
196                child_pos.push(cur_idx);
197                cur_idx = tape.next(cur_idx, "fixed-size list value")?;
198            }
199
200            let actual = child_pos.len() - child_start;
201            if actual != expected {
202                return Err(ArrowError::JsonError(format!(
203                    "Incorrect number of elements for FixedSizeList, \
204                     expected {expected} but got {actual}"
205                )));
206            }
207        }
208
209        let values = self.decoder.decode(tape, &child_pos)?;
210        let nulls = nulls.as_mut().and_then(|x| x.finish());
211
212        let array = FixedSizeListArray::try_new_with_length(
213            self.field.clone(),
214            self.size,
215            values,
216            nulls,
217            pos.len(),
218        )?;
219        Ok(Arc::new(array))
220    }
221}