1use crate::{LengthTracker, RowConverter, Rows, SortField, fixed, null_sentinel};
19use arrow_array::{
20 Array, FixedSizeListArray, GenericListArray, GenericListViewArray, OffsetSizeTrait,
21 new_null_array,
22};
23use arrow_buffer::{
24 ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
25};
26use arrow_data::ArrayDataBuilder;
27use arrow_schema::{ArrowError, DataType, SortOptions};
28use std::{ops::Range, sync::Arc};
29
30pub fn compute_lengths<O: OffsetSizeTrait>(
31 lengths: &mut [usize],
32 rows: &Rows,
33 array: &GenericListArray<O>,
34) {
35 let shift = array.value_offsets()[0].as_usize();
36
37 lengths
38 .iter_mut()
39 .zip(array.value_offsets().windows(2))
40 .enumerate()
41 .for_each(|(idx, (length, offsets))| {
42 let start = offsets[0].as_usize() - shift;
43 let end = offsets[1].as_usize() - shift;
44 let range = array.is_valid(idx).then_some(start..end);
45 *length += list_element_encoded_len(rows, range);
46 });
47}
48
49pub fn encode<O: OffsetSizeTrait>(
53 data: &mut [u8],
54 offsets: &mut [usize],
55 rows: &Rows,
56 opts: SortOptions,
57 array: &GenericListArray<O>,
58) {
59 let shift = array.value_offsets()[0].as_usize();
60
61 offsets
62 .iter_mut()
63 .skip(1)
64 .zip(array.value_offsets().windows(2))
65 .enumerate()
66 .for_each(|(idx, (offset, offsets))| {
67 let start = offsets[0].as_usize() - shift;
68 let end = offsets[1].as_usize() - shift;
69 let range = array.is_valid(idx).then_some(start..end);
70 let out = &mut data[*offset..];
71 *offset += encode_one(out, rows, range, opts)
72 });
73}
74
75#[inline]
76fn encode_one(
77 out: &mut [u8],
78 rows: &Rows,
79 range: Option<Range<usize>>,
80 opts: SortOptions,
81) -> usize {
82 match range {
83 None => super::variable::encode_null(out, opts),
84 Some(range) if range.start == range.end => super::variable::encode_empty(out, opts),
85 Some(range) => {
86 let mut offset = 0;
87 for i in range {
88 let row = rows.row(i);
89 offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts);
90 }
91 offset += super::variable::encode_empty(&mut out[offset..], opts);
92 offset
93 }
94 }
95}
96
97pub unsafe fn decode<O: OffsetSizeTrait>(
103 converter: &RowConverter,
104 rows: &mut [&[u8]],
105 field: &SortField,
106 validate_utf8: bool,
107) -> Result<GenericListArray<O>, ArrowError> {
108 let opts = field.options;
109
110 let mut values_bytes = 0;
111
112 let mut offset = 0;
113 let mut offsets = Vec::with_capacity(rows.len() + 1);
114 offsets.push(O::usize_as(0));
115
116 for row in rows.iter_mut() {
117 let mut row_offset = 0;
118 loop {
119 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
120 values_bytes += x.len();
121 });
122 if decoded <= 1 {
123 offsets.push(O::usize_as(offset));
124 break;
125 }
126 row_offset += decoded;
127 offset += 1;
128 }
129 }
130 O::from_usize(offset).expect("overflow");
131
132 let mut null_count = 0;
133 let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
134 let valid = rows[x][0] != null_sentinel(opts);
135 null_count += !valid as usize;
136 valid
137 });
138
139 let mut values_offsets = Vec::with_capacity(offset);
140 let mut values_bytes = Vec::with_capacity(values_bytes);
141 for row in rows.iter_mut() {
142 let mut row_offset = 0;
143 loop {
144 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
145 values_bytes.extend_from_slice(x)
146 });
147 row_offset += decoded;
148 if decoded <= 1 {
149 break;
150 }
151 values_offsets.push(values_bytes.len());
152 }
153 *row = &row[row_offset..];
154 }
155
156 if opts.descending {
157 values_bytes.iter_mut().for_each(|o| *o = !*o);
158 }
159
160 let mut last_value_offset = 0;
161 let mut child_rows: Vec<_> = values_offsets
162 .into_iter()
163 .map(|offset| {
164 let v = &values_bytes[last_value_offset..offset];
165 last_value_offset = offset;
166 v
167 })
168 .collect();
169
170 let child = unsafe { converter.convert_raw(&mut child_rows, validate_utf8) }?;
171 assert_eq!(child.len(), 1);
172
173 let child_data = child[0].to_data();
174
175 let corrected_type = match &field.data_type {
178 DataType::List(inner_field) => DataType::List(Arc::new(
179 inner_field
180 .as_ref()
181 .clone()
182 .with_data_type(child_data.data_type().clone()),
183 )),
184 DataType::LargeList(inner_field) => DataType::LargeList(Arc::new(
185 inner_field
186 .as_ref()
187 .clone()
188 .with_data_type(child_data.data_type().clone()),
189 )),
190 _ => unreachable!(),
191 };
192
193 let builder = ArrayDataBuilder::new(corrected_type)
194 .len(rows.len())
195 .null_count(null_count)
196 .null_bit_buffer(Some(nulls.into()))
197 .add_buffer(Buffer::from_vec(offsets))
198 .add_child_data(child_data);
199
200 Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))
201}
202
203pub fn compute_lengths_fixed_size_list(
204 tracker: &mut LengthTracker,
205 rows: &Rows,
206 array: &FixedSizeListArray,
207) {
208 let value_length = array.value_length().as_usize();
209 tracker.push_variable((0..array.len()).map(|idx| {
210 match array.is_valid(idx) {
211 true => {
212 1 + ((idx * value_length)..(idx + 1) * value_length)
213 .map(|child_idx| rows.row(child_idx).as_ref().len())
214 .sum::<usize>()
215 }
216 false => 1,
217 }
218 }))
219}
220
221pub fn encode_fixed_size_list(
225 data: &mut [u8],
226 offsets: &mut [usize],
227 rows: &Rows,
228 opts: SortOptions,
229 array: &FixedSizeListArray,
230) {
231 let null_sentinel = null_sentinel(opts);
232 offsets
233 .iter_mut()
234 .skip(1)
235 .enumerate()
236 .for_each(|(idx, offset)| {
237 let value_length = array.value_length().as_usize();
238 match array.is_valid(idx) {
239 true => {
240 data[*offset] = 0x01;
241 *offset += 1;
242 for child_idx in (idx * value_length)..(idx + 1) * value_length {
243 let row = rows.row(child_idx);
244 let end_offset = *offset + row.as_ref().len();
245 data[*offset..end_offset].copy_from_slice(row.as_ref());
246 *offset = end_offset;
247 }
248 }
249 false => {
250 data[*offset] = null_sentinel;
251 *offset += 1;
252 }
253 };
254 })
255}
256
257pub unsafe fn decode_fixed_size_list(
263 converter: &RowConverter,
264 rows: &mut [&[u8]],
265 field: &SortField,
266 validate_utf8: bool,
267 value_length: usize,
268) -> Result<FixedSizeListArray, ArrowError> {
269 let list_type = &field.data_type;
270 let element_type = match list_type {
271 DataType::FixedSizeList(element_field, _) => element_field.data_type(),
272 _ => {
273 return Err(ArrowError::InvalidArgumentError(format!(
274 "Expected FixedSizeListArray, found: {list_type}",
275 )));
276 }
277 };
278
279 let len = rows.len();
280 let (null_count, nulls) = fixed::decode_nulls(rows);
281
282 let null_element_encoded = converter.convert_columns(&[new_null_array(element_type, 1)])?;
283 let null_element_encoded = null_element_encoded.row(0);
284 let null_element_slice = null_element_encoded.as_ref();
285
286 let mut child_rows = Vec::new();
287 for row in rows {
288 let valid = row[0] == 1;
289 let mut row_offset = 1;
290 if !valid {
291 for _ in 0..value_length {
292 child_rows.push(null_element_slice);
293 }
294 } else {
295 for _ in 0..value_length {
296 let mut temp_child_rows = vec![&row[row_offset..]];
297 unsafe { converter.convert_raw(&mut temp_child_rows, validate_utf8) }?;
298 let decoded_bytes = row.len() - row_offset - temp_child_rows[0].len();
299 let next_offset = row_offset + decoded_bytes;
300 child_rows.push(&row[row_offset..next_offset]);
301 row_offset = next_offset;
302 }
303 }
304 *row = &row[row_offset..]; }
306
307 let children = unsafe { converter.convert_raw(&mut child_rows, validate_utf8) }?;
308 let child_data = children.iter().map(|c| c.to_data()).collect();
309 let builder = ArrayDataBuilder::new(list_type.clone())
310 .len(len)
311 .null_count(null_count)
312 .null_bit_buffer(Some(nulls))
313 .child_data(child_data);
314
315 Ok(FixedSizeListArray::from(unsafe {
316 builder.build_unchecked()
317 }))
318}
319
320#[inline]
326fn list_element_encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
327 match range {
328 None => 1,
329 Some(range) => {
330 1 + range
331 .map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len())))
332 .sum::<usize>()
333 }
334 }
335}
336
337pub fn compute_lengths_list_view<O: OffsetSizeTrait>(
341 lengths: &mut [usize],
342 rows: &Rows,
343 array: &GenericListViewArray<O>,
344 shift: usize,
345) {
346 let offsets = array.value_offsets();
347 let sizes = array.value_sizes();
348
349 lengths.iter_mut().enumerate().for_each(|(idx, length)| {
350 let size = sizes[idx].as_usize();
351 let range = array.is_valid(idx).then(|| {
352 let start = if size > 0 {
355 offsets[idx].as_usize() - shift
356 } else {
357 0
358 };
359 start..start + size
360 });
361 *length += list_element_encoded_len(rows, range);
362 });
363}
364
365pub fn encode_list_view<O: OffsetSizeTrait>(
369 data: &mut [u8],
370 out_offsets: &mut [usize],
371 rows: &Rows,
372 opts: SortOptions,
373 array: &GenericListViewArray<O>,
374 shift: usize,
375) {
376 let offsets = array.value_offsets();
377 let sizes = array.value_sizes();
378
379 out_offsets
380 .iter_mut()
381 .skip(1)
382 .enumerate()
383 .for_each(|(idx, offset)| {
384 let size = sizes[idx].as_usize();
385 let range = array.is_valid(idx).then(|| {
386 let start = if size > 0 {
389 offsets[idx].as_usize() - shift
390 } else {
391 0
392 };
393 start..start + size
394 });
395 let out = &mut data[*offset..];
396 *offset += encode_one(out, rows, range, opts)
397 });
398}
399
400pub unsafe fn decode_list_view<O: OffsetSizeTrait>(
406 converter: &RowConverter,
407 rows: &mut [&[u8]],
408 field: &SortField,
409 validate_utf8: bool,
410) -> Result<GenericListViewArray<O>, ArrowError> {
411 let opts = field.options;
412
413 let mut values_bytes = 0;
414
415 let mut child_count = 0usize;
416 let mut list_sizes: Vec<O> = Vec::with_capacity(rows.len());
417
418 for row in rows.iter_mut() {
420 let mut row_offset = 0;
421 let mut list_size = 0usize;
422 loop {
423 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
424 values_bytes += x.len();
425 });
426 if decoded <= 1 {
427 list_sizes.push(O::usize_as(list_size));
428 break;
429 }
430 row_offset += decoded;
431 child_count += 1;
432 list_size += 1;
433 }
434 }
435 O::from_usize(child_count).expect("overflow");
436
437 let mut null_count = 0;
438 let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
439 let valid = rows[x][0] != null_sentinel(opts);
440 null_count += !valid as usize;
441 valid
442 });
443
444 let mut values_offsets_vec = Vec::with_capacity(child_count);
445 let mut values_bytes = Vec::with_capacity(values_bytes);
446 for row in rows.iter_mut() {
447 let mut row_offset = 0;
448 loop {
449 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
450 values_bytes.extend_from_slice(x)
451 });
452 row_offset += decoded;
453 if decoded <= 1 {
454 break;
455 }
456 values_offsets_vec.push(values_bytes.len());
457 }
458 *row = &row[row_offset..];
459 }
460
461 if opts.descending {
462 values_bytes.iter_mut().for_each(|o| *o = !*o);
463 }
464
465 let mut last_value_offset = 0;
466 let mut child_rows: Vec<_> = values_offsets_vec
467 .into_iter()
468 .map(|offset| {
469 let v = &values_bytes[last_value_offset..offset];
470 last_value_offset = offset;
471 v
472 })
473 .collect();
474
475 let child = unsafe { converter.convert_raw(&mut child_rows, validate_utf8) }?;
476 assert_eq!(child.len(), 1);
477
478 let child_data = child[0].to_data();
479
480 let mut list_offsets: Vec<O> = Vec::with_capacity(rows.len());
484 let mut current_offset = O::usize_as(0);
485 for size in &list_sizes {
486 list_offsets.push(current_offset);
487 current_offset += *size;
488 }
489
490 let corrected_inner_field = match &field.data_type {
493 DataType::ListView(inner_field) | DataType::LargeListView(inner_field) => Arc::new(
494 inner_field
495 .as_ref()
496 .clone()
497 .with_data_type(child_data.data_type().clone()),
498 ),
499 _ => unreachable!(),
500 };
501
502 let null_buffer = unsafe {
504 NullBuffer::new_unchecked(BooleanBuffer::new(nulls.into(), 0, rows.len()), null_count)
505 };
506
507 GenericListViewArray::try_new(
508 corrected_inner_field,
509 ScalarBuffer::from(list_offsets),
510 ScalarBuffer::from(list_sizes),
511 child[0].clone(),
512 Some(null_buffer).filter(|n| n.null_count() > 0),
513 )
514}