1use crate::{fixed, null_sentinel, LengthTracker, RowConverter, Rows, SortField};
19use arrow_array::{new_null_array, Array, FixedSizeListArray, GenericListArray, OffsetSizeTrait};
20use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
21use arrow_data::ArrayDataBuilder;
22use arrow_schema::{ArrowError, DataType, SortOptions};
23use std::{ops::Range, sync::Arc};
24
25pub fn compute_lengths<O: OffsetSizeTrait>(
26 lengths: &mut [usize],
27 rows: &Rows,
28 array: &GenericListArray<O>,
29) {
30 let shift = array.value_offsets()[0].as_usize();
31
32 let offsets = array.value_offsets().windows(2);
33 lengths
34 .iter_mut()
35 .zip(offsets)
36 .enumerate()
37 .for_each(|(idx, (length, offsets))| {
38 let start = offsets[0].as_usize() - shift;
39 let end = offsets[1].as_usize() - shift;
40 let range = array.is_valid(idx).then_some(start..end);
41 *length += encoded_len(rows, range);
42 });
43}
44
45fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
46 match range {
47 None => 1,
48 Some(range) => {
49 1 + range
50 .map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len())))
51 .sum::<usize>()
52 }
53 }
54}
55
56pub fn encode<O: OffsetSizeTrait>(
60 data: &mut [u8],
61 offsets: &mut [usize],
62 rows: &Rows,
63 opts: SortOptions,
64 array: &GenericListArray<O>,
65) {
66 let shift = array.value_offsets()[0].as_usize();
67
68 offsets
69 .iter_mut()
70 .skip(1)
71 .zip(array.value_offsets().windows(2))
72 .enumerate()
73 .for_each(|(idx, (offset, offsets))| {
74 let start = offsets[0].as_usize() - shift;
75 let end = offsets[1].as_usize() - shift;
76 let range = array.is_valid(idx).then_some(start..end);
77 let out = &mut data[*offset..];
78 *offset += encode_one(out, rows, range, opts)
79 });
80}
81
82#[inline]
83fn encode_one(
84 out: &mut [u8],
85 rows: &Rows,
86 range: Option<Range<usize>>,
87 opts: SortOptions,
88) -> usize {
89 match range {
90 None => super::variable::encode_null(out, opts),
91 Some(range) if range.start == range.end => super::variable::encode_empty(out, opts),
92 Some(range) => {
93 let mut offset = 0;
94 for i in range {
95 let row = rows.row(i);
96 offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts);
97 }
98 offset += super::variable::encode_empty(&mut out[offset..], opts);
99 offset
100 }
101 }
102}
103
104pub unsafe fn decode<O: OffsetSizeTrait>(
110 converter: &RowConverter,
111 rows: &mut [&[u8]],
112 field: &SortField,
113 validate_utf8: bool,
114) -> Result<GenericListArray<O>, ArrowError> {
115 let opts = field.options;
116
117 let mut values_bytes = 0;
118
119 let mut offset = 0;
120 let mut offsets = Vec::with_capacity(rows.len() + 1);
121 offsets.push(O::usize_as(0));
122
123 for row in rows.iter_mut() {
124 let mut row_offset = 0;
125 loop {
126 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
127 values_bytes += x.len();
128 });
129 if decoded <= 1 {
130 offsets.push(O::usize_as(offset));
131 break;
132 }
133 row_offset += decoded;
134 offset += 1;
135 }
136 }
137 O::from_usize(offset).expect("overflow");
138
139 let mut null_count = 0;
140 let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
141 let valid = rows[x][0] != null_sentinel(opts);
142 null_count += !valid as usize;
143 valid
144 });
145
146 let mut values_offsets = Vec::with_capacity(offset);
147 let mut values_bytes = Vec::with_capacity(values_bytes);
148 for row in rows.iter_mut() {
149 let mut row_offset = 0;
150 loop {
151 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
152 values_bytes.extend_from_slice(x)
153 });
154 row_offset += decoded;
155 if decoded <= 1 {
156 break;
157 }
158 values_offsets.push(values_bytes.len());
159 }
160 *row = &row[row_offset..];
161 }
162
163 if opts.descending {
164 values_bytes.iter_mut().for_each(|o| *o = !*o);
165 }
166
167 let mut last_value_offset = 0;
168 let mut child_rows: Vec<_> = values_offsets
169 .into_iter()
170 .map(|offset| {
171 let v = &values_bytes[last_value_offset..offset];
172 last_value_offset = offset;
173 v
174 })
175 .collect();
176
177 let child = converter.convert_raw(&mut child_rows, validate_utf8)?;
178 assert_eq!(child.len(), 1);
179
180 let child_data = child[0].to_data();
181
182 let corrected_type = match &field.data_type {
185 DataType::List(inner_field) => DataType::List(Arc::new(
186 inner_field
187 .as_ref()
188 .clone()
189 .with_data_type(child_data.data_type().clone()),
190 )),
191 DataType::LargeList(inner_field) => DataType::LargeList(Arc::new(
192 inner_field
193 .as_ref()
194 .clone()
195 .with_data_type(child_data.data_type().clone()),
196 )),
197 _ => unreachable!(),
198 };
199
200 let builder = ArrayDataBuilder::new(corrected_type)
201 .len(rows.len())
202 .null_count(null_count)
203 .null_bit_buffer(Some(nulls.into()))
204 .add_buffer(Buffer::from_vec(offsets))
205 .add_child_data(child_data);
206
207 Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))
208}
209
210pub fn compute_lengths_fixed_size_list(
211 tracker: &mut LengthTracker,
212 rows: &Rows,
213 array: &FixedSizeListArray,
214) {
215 let value_length = array.value_length().as_usize();
216 tracker.push_variable((0..array.len()).map(|idx| {
217 match array.is_valid(idx) {
218 true => {
219 1 + ((idx * value_length)..(idx + 1) * value_length)
220 .map(|child_idx| rows.row(child_idx).as_ref().len())
221 .sum::<usize>()
222 }
223 false => 1,
224 }
225 }))
226}
227
228pub fn encode_fixed_size_list(
232 data: &mut [u8],
233 offsets: &mut [usize],
234 rows: &Rows,
235 opts: SortOptions,
236 array: &FixedSizeListArray,
237) {
238 let null_sentinel = null_sentinel(opts);
239 offsets
240 .iter_mut()
241 .skip(1)
242 .enumerate()
243 .for_each(|(idx, offset)| {
244 let value_length = array.value_length().as_usize();
245 match array.is_valid(idx) {
246 true => {
247 data[*offset] = 0x01;
248 *offset += 1;
249 for child_idx in (idx * value_length)..(idx + 1) * value_length {
250 let row = rows.row(child_idx);
251 let end_offset = *offset + row.as_ref().len();
252 data[*offset..end_offset].copy_from_slice(row.as_ref());
253 *offset = end_offset;
254 }
255 }
256 false => {
257 data[*offset] = null_sentinel;
258 *offset += 1;
259 }
260 };
261 })
262}
263
264pub unsafe fn decode_fixed_size_list(
270 converter: &RowConverter,
271 rows: &mut [&[u8]],
272 field: &SortField,
273 validate_utf8: bool,
274 value_length: usize,
275) -> Result<FixedSizeListArray, ArrowError> {
276 let list_type = &field.data_type;
277 let element_type = match list_type {
278 DataType::FixedSizeList(element_field, _) => element_field.data_type(),
279 _ => {
280 return Err(ArrowError::InvalidArgumentError(format!(
281 "Expected FixedSizeListArray, found: {list_type:?}",
282 )))
283 }
284 };
285
286 let len = rows.len();
287 let (null_count, nulls) = fixed::decode_nulls(rows);
288
289 let null_element_encoded = converter.convert_columns(&[new_null_array(element_type, 1)])?;
290 let null_element_encoded = null_element_encoded.row(0);
291 let null_element_slice = null_element_encoded.as_ref();
292
293 let mut child_rows = Vec::new();
294 for row in rows {
295 let valid = row[0] == 1;
296 let mut row_offset = 1;
297 if !valid {
298 for _ in 0..value_length {
299 child_rows.push(null_element_slice);
300 }
301 } else {
302 for _ in 0..value_length {
303 let mut temp_child_rows = vec![&row[row_offset..]];
304 converter.convert_raw(&mut temp_child_rows, validate_utf8)?;
305 let decoded_bytes = row.len() - row_offset - temp_child_rows[0].len();
306 let next_offset = row_offset + decoded_bytes;
307 child_rows.push(&row[row_offset..next_offset]);
308 row_offset = next_offset;
309 }
310 }
311 *row = &row[row_offset..]; }
313
314 let children = converter.convert_raw(&mut child_rows, validate_utf8)?;
315 let child_data = children.iter().map(|c| c.to_data()).collect();
316 let builder = ArrayDataBuilder::new(list_type.clone())
317 .len(len)
318 .null_count(null_count)
319 .null_bit_buffer(Some(nulls))
320 .child_data(child_data);
321
322 Ok(FixedSizeListArray::from(builder.build_unchecked()))
323}