1use crate::{LengthTracker, RowConverter, Rows, SortField, fixed, null_sentinel};
19use arrow_array::{
20 Array, ArrayRef, FixedSizeListArray, GenericListArray, GenericListViewArray, MapArray,
21 OffsetSizeTrait, StructArray, new_null_array,
22};
23use arrow_buffer::{
24 ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer,
25};
26use arrow_schema::{ArrowError, DataType, Fields, SortOptions};
27use std::{ops::Range, sync::Arc};
28
29pub(crate) trait GenericListArrayOrMap: Array {
30 type Offset: OffsetSizeTrait;
31
32 fn offsets(&self) -> &[Self::Offset];
33
34 unsafe fn from_parts_unchecked(
35 data_type: DataType,
36 offsets: Vec<Self::Offset>,
37 children: Vec<ArrayRef>,
38 null_buffer: Option<NullBuffer>,
39 ) -> Self
40 where
41 Self: Sized;
42}
43
44impl<O: OffsetSizeTrait> GenericListArrayOrMap for GenericListArray<O> {
45 type Offset = O;
46
47 fn offsets(&self) -> &[Self::Offset] {
48 self.value_offsets()
49 }
50
51 unsafe fn from_parts_unchecked(
52 data_type: DataType,
53 offsets: Vec<Self::Offset>,
54 children: Vec<ArrayRef>,
55 null_buffer: Option<NullBuffer>,
56 ) -> Self
57 where
58 Self: Sized,
59 {
60 let field = match data_type {
61 DataType::List(inner_field) | DataType::LargeList(inner_field) => inner_field,
62 _ => unreachable!(),
63 };
64
65 let child = children
66 .into_iter()
67 .next()
68 .expect("List arrays must have exactly one child array");
69
70 let offset_buffer = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) };
73 GenericListArray::<Self::Offset>::new(field, offset_buffer, child, null_buffer)
74 }
75}
76
77impl GenericListArrayOrMap for MapArray {
78 type Offset = i32;
79
80 fn offsets(&self) -> &[Self::Offset] {
81 self.value_offsets()
82 }
83
84 unsafe fn from_parts_unchecked(
85 data_type: DataType,
86 offsets: Vec<Self::Offset>,
87 children: Vec<ArrayRef>,
88 null_buffer: Option<NullBuffer>,
89 ) -> Self
90 where
91 Self: Sized,
92 {
93 let DataType::Map(entries_field, ordered) = data_type else {
94 unreachable!("data type must be Map for MapArray");
95 };
96
97 assert_eq!(
98 children.len(),
99 2,
100 "Map arrays must have exactly two child arrays for keys and values"
101 );
102
103 let DataType::Struct(fields) = entries_field.data_type() else {
104 unreachable!("Map entry type must be Struct");
105 };
106
107 let entries = StructArray::new(
108 fields.clone(),
109 children,
110 None,
112 );
113
114 let offset_buffer = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) };
117
118 MapArray::new(entries_field, offset_buffer, entries, null_buffer, ordered)
119 }
120}
121
122pub(crate) fn compute_lengths<L: GenericListArrayOrMap>(
123 lengths: &mut [usize],
124 rows: &Rows,
125 array: &L,
126) {
127 let shift = array.offsets()[0].as_usize();
128
129 lengths
130 .iter_mut()
131 .zip(array.offsets().windows(2))
132 .enumerate()
133 .for_each(|(idx, (length, offsets))| {
134 let start = offsets[0].as_usize() - shift;
135 let end = offsets[1].as_usize() - shift;
136 let range = array.is_valid(idx).then_some(start..end);
137 *length += list_like_element_encoded_len(rows, range);
138 });
139}
140
141pub(crate) fn encode<L: GenericListArrayOrMap>(
145 data: &mut [u8],
146 offsets: &mut [usize],
147 rows: &Rows,
148 opts: SortOptions,
149 array: &L,
150) {
151 let shift = array.offsets()[0].as_usize();
152
153 offsets
154 .iter_mut()
155 .skip(1)
156 .zip(array.offsets().windows(2))
157 .enumerate()
158 .for_each(|(idx, (offset, offsets))| {
159 let start = offsets[0].as_usize() - shift;
160 let end = offsets[1].as_usize() - shift;
161 let range = array.is_valid(idx).then_some(start..end);
162 let out = &mut data[*offset..];
163 *offset += encode_one(out, rows, range, opts)
164 });
165}
166
167#[inline]
168fn encode_one(
169 out: &mut [u8],
170 rows: &Rows,
171 range: Option<Range<usize>>,
172 opts: SortOptions,
173) -> usize {
174 match range {
175 None => super::variable::encode_null(out, opts),
176 Some(range) if range.start == range.end => super::variable::encode_empty(out, opts),
177 Some(range) => {
178 let mut offset = 0;
179 for i in range {
180 let row = rows.row(i);
181 offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts);
182 }
183 offset += super::variable::encode_empty(&mut out[offset..], opts);
184 offset
185 }
186 }
187}
188
189pub(crate) unsafe fn decode<ListLikeImpl: GenericListArrayOrMap>(
195 converter: &RowConverter,
196 rows: &mut [&[u8]],
197 field: &SortField,
198 validate_utf8: bool,
199) -> Result<ListLikeImpl, ArrowError> {
200 let opts = field.options;
201
202 let mut values_bytes = 0;
203
204 let mut offset = 0;
205 let mut offsets = Vec::with_capacity(rows.len() + 1);
206 offsets.push(ListLikeImpl::Offset::usize_as(0));
207
208 for row in rows.iter_mut() {
209 let mut row_offset = 0;
210 loop {
211 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
212 values_bytes += x.len();
213 });
214 if decoded <= 1 {
215 offsets.push(ListLikeImpl::Offset::usize_as(offset));
216 break;
217 }
218 row_offset += decoded;
219 offset += 1;
220 }
221 }
222 ListLikeImpl::Offset::from_usize(offset).expect("overflow");
223
224 let nulls = crate::variable::decode_nulls_sentinel(rows, opts);
225
226 let mut values_offsets = Vec::with_capacity(offset);
227 let mut values_bytes = Vec::with_capacity(values_bytes);
228 for row in rows.iter_mut() {
229 let mut row_offset = 0;
230 loop {
231 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
232 values_bytes.extend_from_slice(x)
233 });
234 row_offset += decoded;
235 if decoded <= 1 {
236 break;
237 }
238 values_offsets.push(values_bytes.len());
239 }
240 *row = &row[row_offset..];
241 }
242
243 if opts.descending {
244 values_bytes.iter_mut().for_each(|o| *o = !*o);
245 }
246
247 let mut last_value_offset = 0;
248 let mut child_rows: Vec<_> = values_offsets
249 .into_iter()
250 .map(|offset| {
251 let v = &values_bytes[last_value_offset..offset];
252 last_value_offset = offset;
253 v
254 })
255 .collect();
256
257 let children = unsafe { converter.convert_raw(&mut child_rows, validate_utf8) }?;
258
259 let corrected_type = match &field.data_type {
262 DataType::List(inner_field) => {
263 assert_eq!(children.len(), 1);
264 DataType::List(Arc::new(
265 inner_field
266 .as_ref()
267 .clone()
268 .with_data_type(children[0].data_type().clone()),
269 ))
270 }
271 DataType::LargeList(inner_field) => {
272 assert_eq!(children.len(), 1);
273 DataType::LargeList(Arc::new(
274 inner_field
275 .as_ref()
276 .clone()
277 .with_data_type(children[0].data_type().clone()),
278 ))
279 }
280 DataType::Map(inner_field, ordered) => {
281 let DataType::Struct(entries_field) = inner_field.data_type() else {
282 return Err(ArrowError::InvalidArgumentError(format!(
283 "Expected Map entry type to be Struct, found: {}",
284 inner_field.data_type()
285 )));
286 };
287 assert_eq!(
288 children.len(),
289 2,
290 "Map arrays must have exactly two child arrays for keys and values"
291 );
292 let key_field = entries_field[0]
293 .as_ref()
294 .clone()
295 .with_data_type(children[0].data_type().clone());
296 let value_field = entries_field[1]
297 .as_ref()
298 .clone()
299 .with_data_type(children[1].data_type().clone());
300
301 let entries_fields = Fields::from(vec![key_field, value_field]);
302
303 DataType::Map(
304 Arc::new(
305 inner_field
306 .as_ref()
307 .clone()
308 .with_data_type(DataType::Struct(entries_fields)),
309 ),
310 *ordered,
311 )
312 }
313 _ => unreachable!(),
314 };
315
316 Ok(unsafe { ListLikeImpl::from_parts_unchecked(corrected_type, offsets, children, nulls) })
317}
318
319pub fn compute_lengths_fixed_size_list(
320 tracker: &mut LengthTracker,
321 rows: &Rows,
322 array: &FixedSizeListArray,
323) {
324 let value_length = array.value_length().as_usize();
325 tracker.push_variable((0..array.len()).map(|idx| {
326 match array.is_valid(idx) {
327 true => {
328 1 + ((idx * value_length)..(idx + 1) * value_length)
329 .map(|child_idx| rows.row(child_idx).as_ref().len())
330 .sum::<usize>()
331 }
332 false => 1,
333 }
334 }))
335}
336
337pub fn encode_fixed_size_list(
341 data: &mut [u8],
342 offsets: &mut [usize],
343 rows: &Rows,
344 opts: SortOptions,
345 array: &FixedSizeListArray,
346) {
347 let null_sentinel = null_sentinel(opts);
348 offsets
349 .iter_mut()
350 .skip(1)
351 .enumerate()
352 .for_each(|(idx, offset)| {
353 let value_length = array.value_length().as_usize();
354 match array.is_valid(idx) {
355 true => {
356 data[*offset] = 0x01;
357 *offset += 1;
358 for child_idx in (idx * value_length)..(idx + 1) * value_length {
359 let row = rows.row(child_idx);
360 let end_offset = *offset + row.as_ref().len();
361 data[*offset..end_offset].copy_from_slice(row.as_ref());
362 *offset = end_offset;
363 }
364 }
365 false => {
366 data[*offset] = null_sentinel;
367 *offset += 1;
368 }
369 };
370 })
371}
372
373pub unsafe fn decode_fixed_size_list(
379 converter: &RowConverter,
380 rows: &mut [&[u8]],
381 field: &SortField,
382 validate_utf8: bool,
383 value_length: usize,
384) -> Result<FixedSizeListArray, ArrowError> {
385 let list_type = &field.data_type;
386 let DataType::FixedSizeList(element_field, size) = list_type else {
387 return Err(ArrowError::InvalidArgumentError(format!(
388 "Expected FixedSizeListArray, found: {list_type}",
389 )));
390 };
391
392 let num_rows = rows.len();
393 let nulls = fixed::decode_nulls(rows);
394
395 let null_element_encoded =
396 converter.convert_columns(&[new_null_array(element_field.data_type(), 1)])?;
397 let null_element_encoded = null_element_encoded.row(0);
398 let null_element_slice = null_element_encoded.as_ref();
399
400 let mut child_rows = Vec::new();
401 for row in rows {
402 let valid = row[0] == 1;
403 let mut row_offset = 1;
404 if !valid {
405 for _ in 0..value_length {
406 child_rows.push(null_element_slice);
407 }
408 } else {
409 for _ in 0..value_length {
410 let mut temp_child_rows = vec![&row[row_offset..]];
411 unsafe { converter.convert_raw(&mut temp_child_rows, validate_utf8) }?;
412 let decoded_bytes = row.len() - row_offset - temp_child_rows[0].len();
413 let next_offset = row_offset + decoded_bytes;
414 child_rows.push(&row[row_offset..next_offset]);
415 row_offset = next_offset;
416 }
417 }
418 *row = &row[row_offset..]; }
420
421 let mut children = unsafe { converter.convert_raw(&mut child_rows, validate_utf8) }?;
422 assert_eq!(children.len(), 1);
423
424 FixedSizeListArray::try_new_with_length(
425 Arc::clone(element_field),
426 *size,
427 children.pop().unwrap(),
428 nulls,
429 num_rows,
430 )
431}
432
433#[inline]
439fn list_like_element_encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
440 match range {
441 None => 1,
442 Some(range) => {
443 1 + range
444 .map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len())))
445 .sum::<usize>()
446 }
447 }
448}
449
450pub fn compute_lengths_list_view<O: OffsetSizeTrait>(
454 lengths: &mut [usize],
455 rows: &Rows,
456 array: &GenericListViewArray<O>,
457 shift: usize,
458) {
459 let offsets = array.value_offsets();
460 let sizes = array.value_sizes();
461
462 lengths.iter_mut().enumerate().for_each(|(idx, length)| {
463 let size = sizes[idx].as_usize();
464 let range = array.is_valid(idx).then(|| {
465 let start = if size > 0 {
468 offsets[idx].as_usize() - shift
469 } else {
470 0
471 };
472 start..start + size
473 });
474 *length += list_like_element_encoded_len(rows, range);
475 });
476}
477
478pub fn encode_list_view<O: OffsetSizeTrait>(
482 data: &mut [u8],
483 out_offsets: &mut [usize],
484 rows: &Rows,
485 opts: SortOptions,
486 array: &GenericListViewArray<O>,
487 shift: usize,
488) {
489 let offsets = array.value_offsets();
490 let sizes = array.value_sizes();
491
492 out_offsets
493 .iter_mut()
494 .skip(1)
495 .enumerate()
496 .for_each(|(idx, offset)| {
497 let size = sizes[idx].as_usize();
498 let range = array.is_valid(idx).then(|| {
499 let start = if size > 0 {
502 offsets[idx].as_usize() - shift
503 } else {
504 0
505 };
506 start..start + size
507 });
508 let out = &mut data[*offset..];
509 *offset += encode_one(out, rows, range, opts)
510 });
511}
512
513pub unsafe fn decode_list_view<O: OffsetSizeTrait>(
519 converter: &RowConverter,
520 rows: &mut [&[u8]],
521 field: &SortField,
522 validate_utf8: bool,
523) -> Result<GenericListViewArray<O>, ArrowError> {
524 let opts = field.options;
525
526 let mut values_bytes = 0;
527
528 let mut child_count = 0usize;
529 let mut list_sizes: Vec<O> = Vec::with_capacity(rows.len());
530
531 for row in rows.iter_mut() {
533 let mut row_offset = 0;
534 let mut list_size = 0usize;
535 loop {
536 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
537 values_bytes += x.len();
538 });
539 if decoded <= 1 {
540 list_sizes.push(O::usize_as(list_size));
541 break;
542 }
543 row_offset += decoded;
544 child_count += 1;
545 list_size += 1;
546 }
547 }
548 O::from_usize(child_count).expect("overflow");
549
550 let mut null_count = 0;
551 let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
552 let valid = rows[x][0] != null_sentinel(opts);
553 null_count += !valid as usize;
554 valid
555 });
556
557 let mut values_offsets_vec = Vec::with_capacity(child_count);
558 let mut values_bytes = Vec::with_capacity(values_bytes);
559 for row in rows.iter_mut() {
560 let mut row_offset = 0;
561 loop {
562 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
563 values_bytes.extend_from_slice(x)
564 });
565 row_offset += decoded;
566 if decoded <= 1 {
567 break;
568 }
569 values_offsets_vec.push(values_bytes.len());
570 }
571 *row = &row[row_offset..];
572 }
573
574 if opts.descending {
575 values_bytes.iter_mut().for_each(|o| *o = !*o);
576 }
577
578 let mut last_value_offset = 0;
579 let mut child_rows: Vec<_> = values_offsets_vec
580 .into_iter()
581 .map(|offset| {
582 let v = &values_bytes[last_value_offset..offset];
583 last_value_offset = offset;
584 v
585 })
586 .collect();
587
588 let child = unsafe { converter.convert_raw(&mut child_rows, validate_utf8) }?;
589 assert_eq!(child.len(), 1);
590
591 let child_data = child[0].to_data();
592
593 let mut list_offsets: Vec<O> = Vec::with_capacity(rows.len());
597 let mut current_offset = O::usize_as(0);
598 for size in &list_sizes {
599 list_offsets.push(current_offset);
600 current_offset += *size;
601 }
602
603 let corrected_inner_field = match &field.data_type {
606 DataType::ListView(inner_field) | DataType::LargeListView(inner_field) => Arc::new(
607 inner_field
608 .as_ref()
609 .clone()
610 .with_data_type(child_data.data_type().clone()),
611 ),
612 _ => unreachable!(),
613 };
614
615 let null_buffer = unsafe {
617 NullBuffer::new_unchecked(BooleanBuffer::new(nulls.into(), 0, rows.len()), null_count)
618 };
619
620 GenericListViewArray::try_new(
621 corrected_inner_field,
622 ScalarBuffer::from(list_offsets),
623 ScalarBuffer::from(list_sizes),
624 child[0].clone(),
625 Some(null_buffer).filter(|n| n.null_count() > 0),
626 )
627}