Skip to main content

arrow_row/
list.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{LengthTracker, RowConverter, Rows, SortField, fixed, null_sentinel};
19use arrow_array::{
20    Array, FixedSizeListArray, GenericListArray, GenericListViewArray, OffsetSizeTrait,
21    new_null_array,
22};
23use arrow_buffer::{
24    ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, ScalarBuffer,
25};
26use arrow_data::ArrayDataBuilder;
27use arrow_schema::{ArrowError, DataType, SortOptions};
28use std::{ops::Range, sync::Arc};
29
30pub fn compute_lengths<O: OffsetSizeTrait>(
31    lengths: &mut [usize],
32    rows: &Rows,
33    array: &GenericListArray<O>,
34) {
35    let shift = array.value_offsets()[0].as_usize();
36
37    lengths
38        .iter_mut()
39        .zip(array.value_offsets().windows(2))
40        .enumerate()
41        .for_each(|(idx, (length, offsets))| {
42            let start = offsets[0].as_usize() - shift;
43            let end = offsets[1].as_usize() - shift;
44            let range = array.is_valid(idx).then_some(start..end);
45            *length += list_element_encoded_len(rows, range);
46        });
47}
48
49/// Encodes the provided `GenericListArray` to `out` with the provided `SortOptions`
50///
51/// `rows` should contain the encoded child elements
52pub fn encode<O: OffsetSizeTrait>(
53    data: &mut [u8],
54    offsets: &mut [usize],
55    rows: &Rows,
56    opts: SortOptions,
57    array: &GenericListArray<O>,
58) {
59    let shift = array.value_offsets()[0].as_usize();
60
61    offsets
62        .iter_mut()
63        .skip(1)
64        .zip(array.value_offsets().windows(2))
65        .enumerate()
66        .for_each(|(idx, (offset, offsets))| {
67            let start = offsets[0].as_usize() - shift;
68            let end = offsets[1].as_usize() - shift;
69            let range = array.is_valid(idx).then_some(start..end);
70            let out = &mut data[*offset..];
71            *offset += encode_one(out, rows, range, opts)
72        });
73}
74
75#[inline]
76fn encode_one(
77    out: &mut [u8],
78    rows: &Rows,
79    range: Option<Range<usize>>,
80    opts: SortOptions,
81) -> usize {
82    match range {
83        None => super::variable::encode_null(out, opts),
84        Some(range) if range.start == range.end => super::variable::encode_empty(out, opts),
85        Some(range) => {
86            let mut offset = 0;
87            for i in range {
88                let row = rows.row(i);
89                offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts);
90            }
91            offset += super::variable::encode_empty(&mut out[offset..], opts);
92            offset
93        }
94    }
95}
96
97/// Decodes an array from `rows` with the provided `options`
98///
99/// # Safety
100///
101/// `rows` must contain valid data for the provided `converter`
102pub unsafe fn decode<O: OffsetSizeTrait>(
103    converter: &RowConverter,
104    rows: &mut [&[u8]],
105    field: &SortField,
106    validate_utf8: bool,
107) -> Result<GenericListArray<O>, ArrowError> {
108    let opts = field.options;
109
110    let mut values_bytes = 0;
111
112    let mut offset = 0;
113    let mut offsets = Vec::with_capacity(rows.len() + 1);
114    offsets.push(O::usize_as(0));
115
116    for row in rows.iter_mut() {
117        let mut row_offset = 0;
118        loop {
119            let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
120                values_bytes += x.len();
121            });
122            if decoded <= 1 {
123                offsets.push(O::usize_as(offset));
124                break;
125            }
126            row_offset += decoded;
127            offset += 1;
128        }
129    }
130    O::from_usize(offset).expect("overflow");
131
132    let mut null_count = 0;
133    let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
134        let valid = rows[x][0] != null_sentinel(opts);
135        null_count += !valid as usize;
136        valid
137    });
138
139    let mut values_offsets = Vec::with_capacity(offset);
140    let mut values_bytes = Vec::with_capacity(values_bytes);
141    for row in rows.iter_mut() {
142        let mut row_offset = 0;
143        loop {
144            let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
145                values_bytes.extend_from_slice(x)
146            });
147            row_offset += decoded;
148            if decoded <= 1 {
149                break;
150            }
151            values_offsets.push(values_bytes.len());
152        }
153        *row = &row[row_offset..];
154    }
155
156    if opts.descending {
157        values_bytes.iter_mut().for_each(|o| *o = !*o);
158    }
159
160    let mut last_value_offset = 0;
161    let mut child_rows: Vec<_> = values_offsets
162        .into_iter()
163        .map(|offset| {
164            let v = &values_bytes[last_value_offset..offset];
165            last_value_offset = offset;
166            v
167        })
168        .collect();
169
170    let child = unsafe { converter.convert_raw(&mut child_rows, validate_utf8) }?;
171    assert_eq!(child.len(), 1);
172
173    let child_data = child[0].to_data();
174
175    // Since RowConverter flattens certain data types (i.e. Dictionary),
176    // we need to use updated data type instead of original field
177    let corrected_type = match &field.data_type {
178        DataType::List(inner_field) => DataType::List(Arc::new(
179            inner_field
180                .as_ref()
181                .clone()
182                .with_data_type(child_data.data_type().clone()),
183        )),
184        DataType::LargeList(inner_field) => DataType::LargeList(Arc::new(
185            inner_field
186                .as_ref()
187                .clone()
188                .with_data_type(child_data.data_type().clone()),
189        )),
190        _ => unreachable!(),
191    };
192
193    let builder = ArrayDataBuilder::new(corrected_type)
194        .len(rows.len())
195        .null_count(null_count)
196        .null_bit_buffer(Some(nulls.into()))
197        .add_buffer(Buffer::from_vec(offsets))
198        .add_child_data(child_data);
199
200    Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))
201}
202
203pub fn compute_lengths_fixed_size_list(
204    tracker: &mut LengthTracker,
205    rows: &Rows,
206    array: &FixedSizeListArray,
207) {
208    let value_length = array.value_length().as_usize();
209    tracker.push_variable((0..array.len()).map(|idx| {
210        match array.is_valid(idx) {
211            true => {
212                1 + ((idx * value_length)..(idx + 1) * value_length)
213                    .map(|child_idx| rows.row(child_idx).as_ref().len())
214                    .sum::<usize>()
215            }
216            false => 1,
217        }
218    }))
219}
220
221/// Encodes the provided `FixedSizeListArray` to `out` with the provided `SortOptions`
222///
223/// `rows` should contain the encoded child elements
224pub fn encode_fixed_size_list(
225    data: &mut [u8],
226    offsets: &mut [usize],
227    rows: &Rows,
228    opts: SortOptions,
229    array: &FixedSizeListArray,
230) {
231    let null_sentinel = null_sentinel(opts);
232    offsets
233        .iter_mut()
234        .skip(1)
235        .enumerate()
236        .for_each(|(idx, offset)| {
237            let value_length = array.value_length().as_usize();
238            match array.is_valid(idx) {
239                true => {
240                    data[*offset] = 0x01;
241                    *offset += 1;
242                    for child_idx in (idx * value_length)..(idx + 1) * value_length {
243                        let row = rows.row(child_idx);
244                        let end_offset = *offset + row.as_ref().len();
245                        data[*offset..end_offset].copy_from_slice(row.as_ref());
246                        *offset = end_offset;
247                    }
248                }
249                false => {
250                    data[*offset] = null_sentinel;
251                    *offset += 1;
252                }
253            };
254        })
255}
256
257/// Decodes a fixed size list array from `rows` with the provided `options`
258///
259/// # Safety
260///
261/// `rows` must contain valid data for the provided `converter`
262pub unsafe fn decode_fixed_size_list(
263    converter: &RowConverter,
264    rows: &mut [&[u8]],
265    field: &SortField,
266    validate_utf8: bool,
267    value_length: usize,
268) -> Result<FixedSizeListArray, ArrowError> {
269    let list_type = &field.data_type;
270    let element_type = match list_type {
271        DataType::FixedSizeList(element_field, _) => element_field.data_type(),
272        _ => {
273            return Err(ArrowError::InvalidArgumentError(format!(
274                "Expected FixedSizeListArray, found: {list_type}",
275            )));
276        }
277    };
278
279    let len = rows.len();
280    let (null_count, nulls) = fixed::decode_nulls(rows);
281
282    let null_element_encoded = converter.convert_columns(&[new_null_array(element_type, 1)])?;
283    let null_element_encoded = null_element_encoded.row(0);
284    let null_element_slice = null_element_encoded.as_ref();
285
286    let mut child_rows = Vec::new();
287    for row in rows {
288        let valid = row[0] == 1;
289        let mut row_offset = 1;
290        if !valid {
291            for _ in 0..value_length {
292                child_rows.push(null_element_slice);
293            }
294        } else {
295            for _ in 0..value_length {
296                let mut temp_child_rows = vec![&row[row_offset..]];
297                unsafe { converter.convert_raw(&mut temp_child_rows, validate_utf8) }?;
298                let decoded_bytes = row.len() - row_offset - temp_child_rows[0].len();
299                let next_offset = row_offset + decoded_bytes;
300                child_rows.push(&row[row_offset..next_offset]);
301                row_offset = next_offset;
302            }
303        }
304        *row = &row[row_offset..]; // Update row for the next decoder
305    }
306
307    let children = unsafe { converter.convert_raw(&mut child_rows, validate_utf8) }?;
308    let child_data = children.iter().map(|c| c.to_data()).collect();
309    let builder = ArrayDataBuilder::new(list_type.clone())
310        .len(len)
311        .null_count(null_count)
312        .null_bit_buffer(Some(nulls))
313        .child_data(child_data);
314
315    Ok(FixedSizeListArray::from(unsafe {
316        builder.build_unchecked()
317    }))
318}
319
320/// Computes the encoded length for a single list element given its child rows.
321///
322/// This is used by list types (List, LargeList, ListView, LargeListView) to determine
323/// the encoded length of a list element. For null elements, returns 1 (null sentinel only).
324/// For valid elements, returns 1 + the sum of padded lengths for each child row.
325#[inline]
326fn list_element_encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
327    match range {
328        None => 1,
329        Some(range) => {
330            1 + range
331                .map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len())))
332                .sum::<usize>()
333        }
334    }
335}
336
337/// Computes the encoded lengths for a `GenericListViewArray`
338///
339/// `rows` should contain the encoded child elements
340pub fn compute_lengths_list_view<O: OffsetSizeTrait>(
341    lengths: &mut [usize],
342    rows: &Rows,
343    array: &GenericListViewArray<O>,
344    shift: usize,
345) {
346    let offsets = array.value_offsets();
347    let sizes = array.value_sizes();
348
349    lengths.iter_mut().enumerate().for_each(|(idx, length)| {
350        let size = sizes[idx].as_usize();
351        let range = array.is_valid(idx).then(|| {
352            // For empty lists (size=0), offset may be arbitrary and could underflow when shifted.
353            // Use 0 as start since the range is empty anyway.
354            let start = if size > 0 {
355                offsets[idx].as_usize() - shift
356            } else {
357                0
358            };
359            start..start + size
360        });
361        *length += list_element_encoded_len(rows, range);
362    });
363}
364
365/// Encodes the provided `GenericListViewArray` to `out` with the provided `SortOptions`
366///
367/// `rows` should contain the encoded child elements
368pub fn encode_list_view<O: OffsetSizeTrait>(
369    data: &mut [u8],
370    out_offsets: &mut [usize],
371    rows: &Rows,
372    opts: SortOptions,
373    array: &GenericListViewArray<O>,
374    shift: usize,
375) {
376    let offsets = array.value_offsets();
377    let sizes = array.value_sizes();
378
379    out_offsets
380        .iter_mut()
381        .skip(1)
382        .enumerate()
383        .for_each(|(idx, offset)| {
384            let size = sizes[idx].as_usize();
385            let range = array.is_valid(idx).then(|| {
386                // For empty lists (size=0), offset may be arbitrary and could underflow when shifted.
387                // Use 0 as start since the range is empty anyway.
388                let start = if size > 0 {
389                    offsets[idx].as_usize() - shift
390                } else {
391                    0
392                };
393                start..start + size
394            });
395            let out = &mut data[*offset..];
396            *offset += encode_one(out, rows, range, opts)
397        });
398}
399
400/// Decodes a `GenericListViewArray` from `rows` with the provided `options`
401///
402/// # Safety
403///
404/// `rows` must contain valid data for the provided `converter`
405pub unsafe fn decode_list_view<O: OffsetSizeTrait>(
406    converter: &RowConverter,
407    rows: &mut [&[u8]],
408    field: &SortField,
409    validate_utf8: bool,
410) -> Result<GenericListViewArray<O>, ArrowError> {
411    let opts = field.options;
412
413    let mut values_bytes = 0;
414
415    let mut child_count = 0usize;
416    let mut list_sizes: Vec<O> = Vec::with_capacity(rows.len());
417
418    // First pass: count children and compute sizes
419    for row in rows.iter_mut() {
420        let mut row_offset = 0;
421        let mut list_size = 0usize;
422        loop {
423            let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
424                values_bytes += x.len();
425            });
426            if decoded <= 1 {
427                list_sizes.push(O::usize_as(list_size));
428                break;
429            }
430            row_offset += decoded;
431            child_count += 1;
432            list_size += 1;
433        }
434    }
435    O::from_usize(child_count).expect("overflow");
436
437    let mut null_count = 0;
438    let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
439        let valid = rows[x][0] != null_sentinel(opts);
440        null_count += !valid as usize;
441        valid
442    });
443
444    let mut values_offsets_vec = Vec::with_capacity(child_count);
445    let mut values_bytes = Vec::with_capacity(values_bytes);
446    for row in rows.iter_mut() {
447        let mut row_offset = 0;
448        loop {
449            let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
450                values_bytes.extend_from_slice(x)
451            });
452            row_offset += decoded;
453            if decoded <= 1 {
454                break;
455            }
456            values_offsets_vec.push(values_bytes.len());
457        }
458        *row = &row[row_offset..];
459    }
460
461    if opts.descending {
462        values_bytes.iter_mut().for_each(|o| *o = !*o);
463    }
464
465    let mut last_value_offset = 0;
466    let mut child_rows: Vec<_> = values_offsets_vec
467        .into_iter()
468        .map(|offset| {
469            let v = &values_bytes[last_value_offset..offset];
470            last_value_offset = offset;
471            v
472        })
473        .collect();
474
475    let child = unsafe { converter.convert_raw(&mut child_rows, validate_utf8) }?;
476    assert_eq!(child.len(), 1);
477
478    let child_data = child[0].to_data();
479
480    // Technically ListViews don't have to have offsets follow each other precisely, but can be
481    // reused. However, because we cannot preserve that sharing within the row format, this is the
482    // best we can do.
483    let mut list_offsets: Vec<O> = Vec::with_capacity(rows.len());
484    let mut current_offset = O::usize_as(0);
485    for size in &list_sizes {
486        list_offsets.push(current_offset);
487        current_offset += *size;
488    }
489
490    // Since RowConverter flattens certain data types (i.e. Dictionary),
491    // we need to use updated data type instead of original field
492    let corrected_inner_field = match &field.data_type {
493        DataType::ListView(inner_field) | DataType::LargeListView(inner_field) => Arc::new(
494            inner_field
495                .as_ref()
496                .clone()
497                .with_data_type(child_data.data_type().clone()),
498        ),
499        _ => unreachable!(),
500    };
501
502    // SAFETY: null_count was computed correctly when building the nulls buffer above
503    let null_buffer = unsafe {
504        NullBuffer::new_unchecked(BooleanBuffer::new(nulls.into(), 0, rows.len()), null_count)
505    };
506
507    GenericListViewArray::try_new(
508        corrected_inner_field,
509        ScalarBuffer::from(list_offsets),
510        ScalarBuffer::from(list_sizes),
511        child[0].clone(),
512        Some(null_buffer).filter(|n| n.null_count() > 0),
513    )
514}