Skip to main content

arrow_row/
list.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{LengthTracker, RowConverter, Rows, SortField, fixed, null_sentinel};
19use arrow_array::{
20    Array, ArrayRef, FixedSizeListArray, GenericListArray, GenericListViewArray, MapArray,
21    OffsetSizeTrait, StructArray, new_null_array,
22};
23use arrow_buffer::{
24    ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer,
25};
26use arrow_schema::{ArrowError, DataType, Fields, SortOptions};
27use std::{ops::Range, sync::Arc};
28
29pub(crate) trait GenericListArrayOrMap: Array {
30    type Offset: OffsetSizeTrait;
31
32    fn offsets(&self) -> &[Self::Offset];
33
34    unsafe fn from_parts_unchecked(
35        data_type: DataType,
36        offsets: Vec<Self::Offset>,
37        children: Vec<ArrayRef>,
38        null_buffer: Option<NullBuffer>,
39    ) -> Self
40    where
41        Self: Sized;
42}
43
44impl<O: OffsetSizeTrait> GenericListArrayOrMap for GenericListArray<O> {
45    type Offset = O;
46
47    fn offsets(&self) -> &[Self::Offset] {
48        self.value_offsets()
49    }
50
51    unsafe fn from_parts_unchecked(
52        data_type: DataType,
53        offsets: Vec<Self::Offset>,
54        children: Vec<ArrayRef>,
55        null_buffer: Option<NullBuffer>,
56    ) -> Self
57    where
58        Self: Sized,
59    {
60        let field = match data_type {
61            DataType::List(inner_field) | DataType::LargeList(inner_field) => inner_field,
62            _ => unreachable!(),
63        };
64
65        let child = children
66            .into_iter()
67            .next()
68            .expect("List arrays must have exactly one child array");
69
70        // SAFETY: Caller must ensure offsets are valid and correctly correspond to the children and null buffer
71        // the benefit here is to avoid validating that the offsets are monotonically increasing
72        let offset_buffer = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) };
73        GenericListArray::<Self::Offset>::new(field, offset_buffer, child, null_buffer)
74    }
75}
76
77impl GenericListArrayOrMap for MapArray {
78    type Offset = i32;
79
80    fn offsets(&self) -> &[Self::Offset] {
81        self.value_offsets()
82    }
83
84    unsafe fn from_parts_unchecked(
85        data_type: DataType,
86        offsets: Vec<Self::Offset>,
87        children: Vec<ArrayRef>,
88        null_buffer: Option<NullBuffer>,
89    ) -> Self
90    where
91        Self: Sized,
92    {
93        let DataType::Map(entries_field, ordered) = data_type else {
94            unreachable!("data type must be Map for MapArray");
95        };
96
97        assert_eq!(
98            children.len(),
99            2,
100            "Map arrays must have exactly two child arrays for keys and values"
101        );
102
103        let DataType::Struct(fields) = entries_field.data_type() else {
104            unreachable!("Map entry type must be Struct");
105        };
106
107        let entries = StructArray::new(
108            fields.clone(),
109            children,
110            // Entries StructArray cannot have NullBuffer since nulls are represented at the Map level
111            None,
112        );
113
114        // SAFETY: Caller must ensure offsets are valid and correctly correspond to the children and null buffer
115        // the benefit here is to avoid validating that the offsets are monotonically increasing
116        let offset_buffer = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets)) };
117
118        MapArray::new(entries_field, offset_buffer, entries, null_buffer, ordered)
119    }
120}
121
122pub(crate) fn compute_lengths<L: GenericListArrayOrMap>(
123    lengths: &mut [usize],
124    rows: &Rows,
125    array: &L,
126) {
127    let shift = array.offsets()[0].as_usize();
128
129    lengths
130        .iter_mut()
131        .zip(array.offsets().windows(2))
132        .enumerate()
133        .for_each(|(idx, (length, offsets))| {
134            let start = offsets[0].as_usize() - shift;
135            let end = offsets[1].as_usize() - shift;
136            let range = array.is_valid(idx).then_some(start..end);
137            *length += list_like_element_encoded_len(rows, range);
138        });
139}
140
141/// Encodes the provided [`GenericListArrayOrMap`] to `out` with the provided `SortOptions`
142///
143/// `rows` should contain the encoded child elements
144pub(crate) fn encode<L: GenericListArrayOrMap>(
145    data: &mut [u8],
146    offsets: &mut [usize],
147    rows: &Rows,
148    opts: SortOptions,
149    array: &L,
150) {
151    let shift = array.offsets()[0].as_usize();
152
153    offsets
154        .iter_mut()
155        .skip(1)
156        .zip(array.offsets().windows(2))
157        .enumerate()
158        .for_each(|(idx, (offset, offsets))| {
159            let start = offsets[0].as_usize() - shift;
160            let end = offsets[1].as_usize() - shift;
161            let range = array.is_valid(idx).then_some(start..end);
162            let out = &mut data[*offset..];
163            *offset += encode_one(out, rows, range, opts)
164        });
165}
166
167#[inline]
168fn encode_one(
169    out: &mut [u8],
170    rows: &Rows,
171    range: Option<Range<usize>>,
172    opts: SortOptions,
173) -> usize {
174    match range {
175        None => super::variable::encode_null(out, opts),
176        Some(range) if range.start == range.end => super::variable::encode_empty(out, opts),
177        Some(range) => {
178            let mut offset = 0;
179            for i in range {
180                let row = rows.row(i);
181                offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts);
182            }
183            offset += super::variable::encode_empty(&mut out[offset..], opts);
184            offset
185        }
186    }
187}
188
189/// Decodes an array from `rows` with the provided `options`
190///
191/// # Safety
192///
193/// `rows` must contain valid data for the provided `converter`
194pub(crate) unsafe fn decode<ListLikeImpl: GenericListArrayOrMap>(
195    converter: &RowConverter,
196    rows: &mut [&[u8]],
197    field: &SortField,
198    validate_utf8: bool,
199) -> Result<ListLikeImpl, ArrowError> {
200    let opts = field.options;
201
202    let mut values_bytes = 0;
203
204    let mut offset = 0;
205    let mut offsets = Vec::with_capacity(rows.len() + 1);
206    offsets.push(ListLikeImpl::Offset::usize_as(0));
207
208    for row in rows.iter_mut() {
209        let mut row_offset = 0;
210        loop {
211            let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
212                values_bytes += x.len();
213            });
214            if decoded <= 1 {
215                offsets.push(ListLikeImpl::Offset::usize_as(offset));
216                break;
217            }
218            row_offset += decoded;
219            offset += 1;
220        }
221    }
222    ListLikeImpl::Offset::from_usize(offset).expect("overflow");
223
224    let nulls = crate::variable::decode_nulls_sentinel(rows, opts);
225
226    let mut values_offsets = Vec::with_capacity(offset);
227    let mut values_bytes = Vec::with_capacity(values_bytes);
228    for row in rows.iter_mut() {
229        let mut row_offset = 0;
230        loop {
231            let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
232                values_bytes.extend_from_slice(x)
233            });
234            row_offset += decoded;
235            if decoded <= 1 {
236                break;
237            }
238            values_offsets.push(values_bytes.len());
239        }
240        *row = &row[row_offset..];
241    }
242
243    if opts.descending {
244        values_bytes.iter_mut().for_each(|o| *o = !*o);
245    }
246
247    let mut last_value_offset = 0;
248    let mut child_rows: Vec<_> = values_offsets
249        .into_iter()
250        .map(|offset| {
251            let v = &values_bytes[last_value_offset..offset];
252            last_value_offset = offset;
253            v
254        })
255        .collect();
256
257    let children = unsafe { converter.convert_raw(&mut child_rows, validate_utf8) }?;
258
259    // Since RowConverter flattens certain data types (i.e. Dictionary),
260    // we need to use updated data type instead of original field
261    let corrected_type = match &field.data_type {
262        DataType::List(inner_field) => {
263            assert_eq!(children.len(), 1);
264            DataType::List(Arc::new(
265                inner_field
266                    .as_ref()
267                    .clone()
268                    .with_data_type(children[0].data_type().clone()),
269            ))
270        }
271        DataType::LargeList(inner_field) => {
272            assert_eq!(children.len(), 1);
273            DataType::LargeList(Arc::new(
274                inner_field
275                    .as_ref()
276                    .clone()
277                    .with_data_type(children[0].data_type().clone()),
278            ))
279        }
280        DataType::Map(inner_field, ordered) => {
281            let DataType::Struct(entries_field) = inner_field.data_type() else {
282                return Err(ArrowError::InvalidArgumentError(format!(
283                    "Expected Map entry type to be Struct, found: {}",
284                    inner_field.data_type()
285                )));
286            };
287            assert_eq!(
288                children.len(),
289                2,
290                "Map arrays must have exactly two child arrays for keys and values"
291            );
292            let key_field = entries_field[0]
293                .as_ref()
294                .clone()
295                .with_data_type(children[0].data_type().clone());
296            let value_field = entries_field[1]
297                .as_ref()
298                .clone()
299                .with_data_type(children[1].data_type().clone());
300
301            let entries_fields = Fields::from(vec![key_field, value_field]);
302
303            DataType::Map(
304                Arc::new(
305                    inner_field
306                        .as_ref()
307                        .clone()
308                        .with_data_type(DataType::Struct(entries_fields)),
309                ),
310                *ordered,
311            )
312        }
313        _ => unreachable!(),
314    };
315
316    Ok(unsafe { ListLikeImpl::from_parts_unchecked(corrected_type, offsets, children, nulls) })
317}
318
319pub fn compute_lengths_fixed_size_list(
320    tracker: &mut LengthTracker,
321    rows: &Rows,
322    array: &FixedSizeListArray,
323) {
324    let value_length = array.value_length().as_usize();
325    tracker.push_variable((0..array.len()).map(|idx| {
326        match array.is_valid(idx) {
327            true => {
328                1 + ((idx * value_length)..(idx + 1) * value_length)
329                    .map(|child_idx| rows.row(child_idx).as_ref().len())
330                    .sum::<usize>()
331            }
332            false => 1,
333        }
334    }))
335}
336
337/// Encodes the provided `FixedSizeListArray` to `out` with the provided `SortOptions`
338///
339/// `rows` should contain the encoded child elements
340pub fn encode_fixed_size_list(
341    data: &mut [u8],
342    offsets: &mut [usize],
343    rows: &Rows,
344    opts: SortOptions,
345    array: &FixedSizeListArray,
346) {
347    let null_sentinel = null_sentinel(opts);
348    offsets
349        .iter_mut()
350        .skip(1)
351        .enumerate()
352        .for_each(|(idx, offset)| {
353            let value_length = array.value_length().as_usize();
354            match array.is_valid(idx) {
355                true => {
356                    data[*offset] = 0x01;
357                    *offset += 1;
358                    for child_idx in (idx * value_length)..(idx + 1) * value_length {
359                        let row = rows.row(child_idx);
360                        let end_offset = *offset + row.as_ref().len();
361                        data[*offset..end_offset].copy_from_slice(row.as_ref());
362                        *offset = end_offset;
363                    }
364                }
365                false => {
366                    data[*offset] = null_sentinel;
367                    *offset += 1;
368                }
369            };
370        })
371}
372
373/// Decodes a fixed size list array from `rows` with the provided `options`
374///
375/// # Safety
376///
377/// `rows` must contain valid data for the provided `converter`
378pub unsafe fn decode_fixed_size_list(
379    converter: &RowConverter,
380    rows: &mut [&[u8]],
381    field: &SortField,
382    validate_utf8: bool,
383    value_length: usize,
384) -> Result<FixedSizeListArray, ArrowError> {
385    let list_type = &field.data_type;
386    let DataType::FixedSizeList(element_field, size) = list_type else {
387        return Err(ArrowError::InvalidArgumentError(format!(
388            "Expected FixedSizeListArray, found: {list_type}",
389        )));
390    };
391
392    let num_rows = rows.len();
393    let nulls = fixed::decode_nulls(rows);
394
395    let null_element_encoded =
396        converter.convert_columns(&[new_null_array(element_field.data_type(), 1)])?;
397    let null_element_encoded = null_element_encoded.row(0);
398    let null_element_slice = null_element_encoded.as_ref();
399
400    let mut child_rows = Vec::new();
401    for row in rows {
402        let valid = row[0] == 1;
403        let mut row_offset = 1;
404        if !valid {
405            for _ in 0..value_length {
406                child_rows.push(null_element_slice);
407            }
408        } else {
409            for _ in 0..value_length {
410                let mut temp_child_rows = vec![&row[row_offset..]];
411                unsafe { converter.convert_raw(&mut temp_child_rows, validate_utf8) }?;
412                let decoded_bytes = row.len() - row_offset - temp_child_rows[0].len();
413                let next_offset = row_offset + decoded_bytes;
414                child_rows.push(&row[row_offset..next_offset]);
415                row_offset = next_offset;
416            }
417        }
418        *row = &row[row_offset..]; // Update row for the next decoder
419    }
420
421    let mut children = unsafe { converter.convert_raw(&mut child_rows, validate_utf8) }?;
422    assert_eq!(children.len(), 1);
423
424    FixedSizeListArray::try_new_with_length(
425        Arc::clone(element_field),
426        *size,
427        children.pop().unwrap(),
428        nulls,
429        num_rows,
430    )
431}
432
433/// Computes the encoded length for a single list/map element given its child rows.
434///
435/// This is used by list types (List, LargeList, ListView, LargeListView) and by Map to determine
436/// the encoded length of a list element/map entry. For null elements, returns 1 (null sentinel only).
437/// For valid elements, returns 1 + the sum of padded lengths for each child row.
438#[inline]
439fn list_like_element_encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
440    match range {
441        None => 1,
442        Some(range) => {
443            1 + range
444                .map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len())))
445                .sum::<usize>()
446        }
447    }
448}
449
450/// Computes the encoded lengths for a `GenericListViewArray`
451///
452/// `rows` should contain the encoded child elements
453pub fn compute_lengths_list_view<O: OffsetSizeTrait>(
454    lengths: &mut [usize],
455    rows: &Rows,
456    array: &GenericListViewArray<O>,
457    shift: usize,
458) {
459    let offsets = array.value_offsets();
460    let sizes = array.value_sizes();
461
462    lengths.iter_mut().enumerate().for_each(|(idx, length)| {
463        let size = sizes[idx].as_usize();
464        let range = array.is_valid(idx).then(|| {
465            // For empty lists (size=0), offset may be arbitrary and could underflow when shifted.
466            // Use 0 as start since the range is empty anyway.
467            let start = if size > 0 {
468                offsets[idx].as_usize() - shift
469            } else {
470                0
471            };
472            start..start + size
473        });
474        *length += list_like_element_encoded_len(rows, range);
475    });
476}
477
478/// Encodes the provided `GenericListViewArray` to `out` with the provided `SortOptions`
479///
480/// `rows` should contain the encoded child elements
481pub fn encode_list_view<O: OffsetSizeTrait>(
482    data: &mut [u8],
483    out_offsets: &mut [usize],
484    rows: &Rows,
485    opts: SortOptions,
486    array: &GenericListViewArray<O>,
487    shift: usize,
488) {
489    let offsets = array.value_offsets();
490    let sizes = array.value_sizes();
491
492    out_offsets
493        .iter_mut()
494        .skip(1)
495        .enumerate()
496        .for_each(|(idx, offset)| {
497            let size = sizes[idx].as_usize();
498            let range = array.is_valid(idx).then(|| {
499                // For empty lists (size=0), offset may be arbitrary and could underflow when shifted.
500                // Use 0 as start since the range is empty anyway.
501                let start = if size > 0 {
502                    offsets[idx].as_usize() - shift
503                } else {
504                    0
505                };
506                start..start + size
507            });
508            let out = &mut data[*offset..];
509            *offset += encode_one(out, rows, range, opts)
510        });
511}
512
513/// Decodes a `GenericListViewArray` from `rows` with the provided `options`
514///
515/// # Safety
516///
517/// `rows` must contain valid data for the provided `converter`
518pub unsafe fn decode_list_view<O: OffsetSizeTrait>(
519    converter: &RowConverter,
520    rows: &mut [&[u8]],
521    field: &SortField,
522    validate_utf8: bool,
523) -> Result<GenericListViewArray<O>, ArrowError> {
524    let opts = field.options;
525
526    let mut values_bytes = 0;
527
528    let mut child_count = 0usize;
529    let mut list_sizes: Vec<O> = Vec::with_capacity(rows.len());
530
531    // First pass: count children and compute sizes
532    for row in rows.iter_mut() {
533        let mut row_offset = 0;
534        let mut list_size = 0usize;
535        loop {
536            let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
537                values_bytes += x.len();
538            });
539            if decoded <= 1 {
540                list_sizes.push(O::usize_as(list_size));
541                break;
542            }
543            row_offset += decoded;
544            child_count += 1;
545            list_size += 1;
546        }
547    }
548    O::from_usize(child_count).expect("overflow");
549
550    let mut null_count = 0;
551    let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
552        let valid = rows[x][0] != null_sentinel(opts);
553        null_count += !valid as usize;
554        valid
555    });
556
557    let mut values_offsets_vec = Vec::with_capacity(child_count);
558    let mut values_bytes = Vec::with_capacity(values_bytes);
559    for row in rows.iter_mut() {
560        let mut row_offset = 0;
561        loop {
562            let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
563                values_bytes.extend_from_slice(x)
564            });
565            row_offset += decoded;
566            if decoded <= 1 {
567                break;
568            }
569            values_offsets_vec.push(values_bytes.len());
570        }
571        *row = &row[row_offset..];
572    }
573
574    if opts.descending {
575        values_bytes.iter_mut().for_each(|o| *o = !*o);
576    }
577
578    let mut last_value_offset = 0;
579    let mut child_rows: Vec<_> = values_offsets_vec
580        .into_iter()
581        .map(|offset| {
582            let v = &values_bytes[last_value_offset..offset];
583            last_value_offset = offset;
584            v
585        })
586        .collect();
587
588    let child = unsafe { converter.convert_raw(&mut child_rows, validate_utf8) }?;
589    assert_eq!(child.len(), 1);
590
591    let child_data = child[0].to_data();
592
593    // Technically ListViews don't have to have offsets follow each other precisely, but can be
594    // reused. However, because we cannot preserve that sharing within the row format, this is the
595    // best we can do.
596    let mut list_offsets: Vec<O> = Vec::with_capacity(rows.len());
597    let mut current_offset = O::usize_as(0);
598    for size in &list_sizes {
599        list_offsets.push(current_offset);
600        current_offset += *size;
601    }
602
603    // Since RowConverter flattens certain data types (i.e. Dictionary),
604    // we need to use updated data type instead of original field
605    let corrected_inner_field = match &field.data_type {
606        DataType::ListView(inner_field) | DataType::LargeListView(inner_field) => Arc::new(
607            inner_field
608                .as_ref()
609                .clone()
610                .with_data_type(child_data.data_type().clone()),
611        ),
612        _ => unreachable!(),
613    };
614
615    // SAFETY: null_count was computed correctly when building the nulls buffer above
616    let null_buffer = unsafe {
617        NullBuffer::new_unchecked(BooleanBuffer::new(nulls.into(), 0, rows.len()), null_count)
618    };
619
620    GenericListViewArray::try_new(
621        corrected_inner_field,
622        ScalarBuffer::from(list_offsets),
623        ScalarBuffer::from(list_sizes),
624        child[0].clone(),
625        Some(null_buffer).filter(|n| n.null_count() > 0),
626    )
627}