Skip to main content

arrow_json/reader/
list_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::marker::PhantomData;
19use std::sync::Arc;
20
21use arrow_array::builder::BooleanBufferBuilder;
22use arrow_array::{ArrayRef, GenericListArray, GenericListViewArray, OffsetSizeTrait};
23use arrow_buffer::buffer::NullBuffer;
24use arrow_buffer::{OffsetBuffer, ScalarBuffer};
25use arrow_schema::{ArrowError, DataType, FieldRef};
26
27use crate::reader::tape::{Tape, TapeElement};
28use crate::reader::{ArrayDecoder, DecoderContext};
29
30pub type ListArrayDecoder<O> = ListLikeArrayDecoder<O, false>;
31pub type ListViewArrayDecoder<O> = ListLikeArrayDecoder<O, true>;
32
33pub struct ListLikeArrayDecoder<O, const IS_VIEW: bool> {
34    field: FieldRef,
35    decoder: Box<dyn ArrayDecoder>,
36    phantom: PhantomData<O>,
37    ignore_type_conflicts: bool,
38    is_nullable: bool,
39}
40
41impl<O: OffsetSizeTrait, const IS_VIEW: bool> ListLikeArrayDecoder<O, IS_VIEW> {
42    pub fn new(
43        ctx: &DecoderContext,
44        data_type: &DataType,
45        is_nullable: bool,
46    ) -> Result<Self, ArrowError> {
47        let field = match (IS_VIEW, data_type) {
48            (false, DataType::List(f)) if !O::IS_LARGE => f,
49            (false, DataType::LargeList(f)) if O::IS_LARGE => f,
50            (true, DataType::ListView(f)) if !O::IS_LARGE => f,
51            (true, DataType::LargeListView(f)) if O::IS_LARGE => f,
52            _ => unreachable!(),
53        };
54        let decoder = ctx.make_decoder(field.data_type(), field.is_nullable())?;
55
56        Ok(Self {
57            field: field.clone(),
58            decoder,
59            phantom: Default::default(),
60            ignore_type_conflicts: ctx.ignore_type_conflicts(),
61            is_nullable,
62        })
63    }
64}
65
66impl<O: OffsetSizeTrait, const IS_VIEW: bool> ArrayDecoder for ListLikeArrayDecoder<O, IS_VIEW> {
67    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
68        let mut child_pos = Vec::with_capacity(pos.len());
69        let mut offsets = Vec::with_capacity(pos.len() + 1);
70        offsets.push(O::from_usize(0).unwrap());
71
72        let mut nulls = self
73            .is_nullable
74            .then(|| BooleanBufferBuilder::new(pos.len()));
75
76        for p in pos {
77            let end_idx = match (tape.get(*p), nulls.as_mut()) {
78                (TapeElement::StartList(end_idx), None) => end_idx,
79                (TapeElement::StartList(end_idx), Some(nulls)) => {
80                    nulls.append(true);
81                    end_idx
82                }
83                (TapeElement::Null, Some(nulls)) => {
84                    nulls.append(false);
85                    *p + 1
86                }
87                (_, Some(nulls)) if self.ignore_type_conflicts => {
88                    nulls.append(false);
89                    *p + 1
90                }
91                _ => return Err(tape.error(*p, "[")),
92            };
93
94            let mut cur_idx = *p + 1;
95            while cur_idx < end_idx {
96                child_pos.push(cur_idx);
97
98                // Advance to next field
99                cur_idx = tape.next(cur_idx, "list value")?;
100            }
101
102            let offset = O::from_usize(child_pos.len()).ok_or_else(|| {
103                ArrowError::JsonError(format!("offset overflow decoding {}ListArray", O::PREFIX))
104            })?;
105            offsets.push(offset);
106        }
107
108        let values = self.decoder.decode(tape, &child_pos)?;
109        let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish()));
110
111        if IS_VIEW {
112            let mut sizes = Vec::with_capacity(offsets.len() - 1);
113            for i in 1..offsets.len() {
114                sizes.push(offsets[i] - offsets[i - 1]);
115            }
116            offsets.pop();
117            // SAFETY: offsets and sizes are constructed correctly from the tape
118            let array = unsafe {
119                GenericListViewArray::<O>::new_unchecked(
120                    self.field.clone(),
121                    ScalarBuffer::from(offsets),
122                    ScalarBuffer::from(sizes),
123                    values,
124                    nulls,
125                )
126            };
127            Ok(Arc::new(array))
128        } else {
129            // SAFETY: offsets are built monotonically starting from 0
130            let offsets = unsafe { OffsetBuffer::<O>::new_unchecked(ScalarBuffer::from(offsets)) };
131
132            let array = GenericListArray::<O>::try_new(self.field.clone(), offsets, values, nulls)?;
133            Ok(Arc::new(array))
134        }
135    }
136}