arrow_json/reader/
list_array.rs1use std::marker::PhantomData;
19use std::sync::Arc;
20
21use arrow_array::builder::BooleanBufferBuilder;
22use arrow_array::{ArrayRef, GenericListArray, GenericListViewArray, OffsetSizeTrait};
23use arrow_buffer::buffer::NullBuffer;
24use arrow_buffer::{OffsetBuffer, ScalarBuffer};
25use arrow_schema::{ArrowError, DataType, FieldRef};
26
27use crate::reader::tape::{Tape, TapeElement};
28use crate::reader::{ArrayDecoder, DecoderContext};
29
30pub type ListArrayDecoder<O> = ListLikeArrayDecoder<O, false>;
31pub type ListViewArrayDecoder<O> = ListLikeArrayDecoder<O, true>;
32
33pub struct ListLikeArrayDecoder<O, const IS_VIEW: bool> {
34 field: FieldRef,
35 decoder: Box<dyn ArrayDecoder>,
36 phantom: PhantomData<O>,
37 ignore_type_conflicts: bool,
38 is_nullable: bool,
39}
40
41impl<O: OffsetSizeTrait, const IS_VIEW: bool> ListLikeArrayDecoder<O, IS_VIEW> {
42 pub fn new(
43 ctx: &DecoderContext,
44 data_type: &DataType,
45 is_nullable: bool,
46 ) -> Result<Self, ArrowError> {
47 let field = match (IS_VIEW, data_type) {
48 (false, DataType::List(f)) if !O::IS_LARGE => f,
49 (false, DataType::LargeList(f)) if O::IS_LARGE => f,
50 (true, DataType::ListView(f)) if !O::IS_LARGE => f,
51 (true, DataType::LargeListView(f)) if O::IS_LARGE => f,
52 _ => unreachable!(),
53 };
54 let decoder = ctx.make_decoder(field.data_type(), field.is_nullable())?;
55
56 Ok(Self {
57 field: field.clone(),
58 decoder,
59 phantom: Default::default(),
60 ignore_type_conflicts: ctx.ignore_type_conflicts(),
61 is_nullable,
62 })
63 }
64}
65
66impl<O: OffsetSizeTrait, const IS_VIEW: bool> ArrayDecoder for ListLikeArrayDecoder<O, IS_VIEW> {
67 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
68 let mut child_pos = Vec::with_capacity(pos.len());
69 let mut offsets = Vec::with_capacity(pos.len() + 1);
70 offsets.push(O::from_usize(0).unwrap());
71
72 let mut nulls = self
73 .is_nullable
74 .then(|| BooleanBufferBuilder::new(pos.len()));
75
76 for p in pos {
77 let end_idx = match (tape.get(*p), nulls.as_mut()) {
78 (TapeElement::StartList(end_idx), None) => end_idx,
79 (TapeElement::StartList(end_idx), Some(nulls)) => {
80 nulls.append(true);
81 end_idx
82 }
83 (TapeElement::Null, Some(nulls)) => {
84 nulls.append(false);
85 *p + 1
86 }
87 (_, Some(nulls)) if self.ignore_type_conflicts => {
88 nulls.append(false);
89 *p + 1
90 }
91 _ => return Err(tape.error(*p, "[")),
92 };
93
94 let mut cur_idx = *p + 1;
95 while cur_idx < end_idx {
96 child_pos.push(cur_idx);
97
98 cur_idx = tape.next(cur_idx, "list value")?;
100 }
101
102 let offset = O::from_usize(child_pos.len()).ok_or_else(|| {
103 ArrowError::JsonError(format!("offset overflow decoding {}ListArray", O::PREFIX))
104 })?;
105 offsets.push(offset);
106 }
107
108 let values = self.decoder.decode(tape, &child_pos)?;
109 let nulls = nulls.as_mut().map(|x| NullBuffer::new(x.finish()));
110
111 if IS_VIEW {
112 let mut sizes = Vec::with_capacity(offsets.len() - 1);
113 for i in 1..offsets.len() {
114 sizes.push(offsets[i] - offsets[i - 1]);
115 }
116 offsets.pop();
117 let array = unsafe {
119 GenericListViewArray::<O>::new_unchecked(
120 self.field.clone(),
121 ScalarBuffer::from(offsets),
122 ScalarBuffer::from(sizes),
123 values,
124 nulls,
125 )
126 };
127 Ok(Arc::new(array))
128 } else {
129 let offsets = unsafe { OffsetBuffer::<O>::new_unchecked(ScalarBuffer::from(offsets)) };
131
132 let array = GenericListArray::<O>::try_new(self.field.clone(), offsets, values, nulls)?;
133 Ok(Arc::new(array))
134 }
135 }
136}