arrow_row/
list.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{fixed, null_sentinel, LengthTracker, RowConverter, Rows, SortField};
19use arrow_array::{new_null_array, Array, FixedSizeListArray, GenericListArray, OffsetSizeTrait};
20use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
21use arrow_data::ArrayDataBuilder;
22use arrow_schema::{ArrowError, DataType, SortOptions};
23use std::{ops::Range, sync::Arc};
24
25pub fn compute_lengths<O: OffsetSizeTrait>(
26    lengths: &mut [usize],
27    rows: &Rows,
28    array: &GenericListArray<O>,
29) {
30    let shift = array.value_offsets()[0].as_usize();
31
32    let offsets = array.value_offsets().windows(2);
33    lengths
34        .iter_mut()
35        .zip(offsets)
36        .enumerate()
37        .for_each(|(idx, (length, offsets))| {
38            let start = offsets[0].as_usize() - shift;
39            let end = offsets[1].as_usize() - shift;
40            let range = array.is_valid(idx).then_some(start..end);
41            *length += encoded_len(rows, range);
42        });
43}
44
45fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
46    match range {
47        None => 1,
48        Some(range) => {
49            1 + range
50                .map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len())))
51                .sum::<usize>()
52        }
53    }
54}
55
56/// Encodes the provided `GenericListArray` to `out` with the provided `SortOptions`
57///
58/// `rows` should contain the encoded child elements
59pub fn encode<O: OffsetSizeTrait>(
60    data: &mut [u8],
61    offsets: &mut [usize],
62    rows: &Rows,
63    opts: SortOptions,
64    array: &GenericListArray<O>,
65) {
66    let shift = array.value_offsets()[0].as_usize();
67
68    offsets
69        .iter_mut()
70        .skip(1)
71        .zip(array.value_offsets().windows(2))
72        .enumerate()
73        .for_each(|(idx, (offset, offsets))| {
74            let start = offsets[0].as_usize() - shift;
75            let end = offsets[1].as_usize() - shift;
76            let range = array.is_valid(idx).then_some(start..end);
77            let out = &mut data[*offset..];
78            *offset += encode_one(out, rows, range, opts)
79        });
80}
81
82#[inline]
83fn encode_one(
84    out: &mut [u8],
85    rows: &Rows,
86    range: Option<Range<usize>>,
87    opts: SortOptions,
88) -> usize {
89    match range {
90        None => super::variable::encode_null(out, opts),
91        Some(range) if range.start == range.end => super::variable::encode_empty(out, opts),
92        Some(range) => {
93            let mut offset = 0;
94            for i in range {
95                let row = rows.row(i);
96                offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts);
97            }
98            offset += super::variable::encode_empty(&mut out[offset..], opts);
99            offset
100        }
101    }
102}
103
104/// Decodes an array from `rows` with the provided `options`
105///
106/// # Safety
107///
108/// `rows` must contain valid data for the provided `converter`
109pub unsafe fn decode<O: OffsetSizeTrait>(
110    converter: &RowConverter,
111    rows: &mut [&[u8]],
112    field: &SortField,
113    validate_utf8: bool,
114) -> Result<GenericListArray<O>, ArrowError> {
115    let opts = field.options;
116
117    let mut values_bytes = 0;
118
119    let mut offset = 0;
120    let mut offsets = Vec::with_capacity(rows.len() + 1);
121    offsets.push(O::usize_as(0));
122
123    for row in rows.iter_mut() {
124        let mut row_offset = 0;
125        loop {
126            let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
127                values_bytes += x.len();
128            });
129            if decoded <= 1 {
130                offsets.push(O::usize_as(offset));
131                break;
132            }
133            row_offset += decoded;
134            offset += 1;
135        }
136    }
137    O::from_usize(offset).expect("overflow");
138
139    let mut null_count = 0;
140    let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
141        let valid = rows[x][0] != null_sentinel(opts);
142        null_count += !valid as usize;
143        valid
144    });
145
146    let mut values_offsets = Vec::with_capacity(offset);
147    let mut values_bytes = Vec::with_capacity(values_bytes);
148    for row in rows.iter_mut() {
149        let mut row_offset = 0;
150        loop {
151            let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
152                values_bytes.extend_from_slice(x)
153            });
154            row_offset += decoded;
155            if decoded <= 1 {
156                break;
157            }
158            values_offsets.push(values_bytes.len());
159        }
160        *row = &row[row_offset..];
161    }
162
163    if opts.descending {
164        values_bytes.iter_mut().for_each(|o| *o = !*o);
165    }
166
167    let mut last_value_offset = 0;
168    let mut child_rows: Vec<_> = values_offsets
169        .into_iter()
170        .map(|offset| {
171            let v = &values_bytes[last_value_offset..offset];
172            last_value_offset = offset;
173            v
174        })
175        .collect();
176
177    let child = converter.convert_raw(&mut child_rows, validate_utf8)?;
178    assert_eq!(child.len(), 1);
179
180    let child_data = child[0].to_data();
181
182    // Since RowConverter flattens certain data types (i.e. Dictionary),
183    // we need to use updated data type instead of original field
184    let corrected_type = match &field.data_type {
185        DataType::List(inner_field) => DataType::List(Arc::new(
186            inner_field
187                .as_ref()
188                .clone()
189                .with_data_type(child_data.data_type().clone()),
190        )),
191        DataType::LargeList(inner_field) => DataType::LargeList(Arc::new(
192            inner_field
193                .as_ref()
194                .clone()
195                .with_data_type(child_data.data_type().clone()),
196        )),
197        _ => unreachable!(),
198    };
199
200    let builder = ArrayDataBuilder::new(corrected_type)
201        .len(rows.len())
202        .null_count(null_count)
203        .null_bit_buffer(Some(nulls.into()))
204        .add_buffer(Buffer::from_vec(offsets))
205        .add_child_data(child_data);
206
207    Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))
208}
209
210pub fn compute_lengths_fixed_size_list(
211    tracker: &mut LengthTracker,
212    rows: &Rows,
213    array: &FixedSizeListArray,
214) {
215    let value_length = array.value_length().as_usize();
216    tracker.push_variable((0..array.len()).map(|idx| {
217        match array.is_valid(idx) {
218            true => {
219                1 + ((idx * value_length)..(idx + 1) * value_length)
220                    .map(|child_idx| rows.row(child_idx).as_ref().len())
221                    .sum::<usize>()
222            }
223            false => 1,
224        }
225    }))
226}
227
228/// Encodes the provided `FixedSizeListArray` to `out` with the provided `SortOptions`
229///
230/// `rows` should contain the encoded child elements
231pub fn encode_fixed_size_list(
232    data: &mut [u8],
233    offsets: &mut [usize],
234    rows: &Rows,
235    opts: SortOptions,
236    array: &FixedSizeListArray,
237) {
238    let null_sentinel = null_sentinel(opts);
239    offsets
240        .iter_mut()
241        .skip(1)
242        .enumerate()
243        .for_each(|(idx, offset)| {
244            let value_length = array.value_length().as_usize();
245            match array.is_valid(idx) {
246                true => {
247                    data[*offset] = 0x01;
248                    *offset += 1;
249                    for child_idx in (idx * value_length)..(idx + 1) * value_length {
250                        let row = rows.row(child_idx);
251                        let end_offset = *offset + row.as_ref().len();
252                        data[*offset..end_offset].copy_from_slice(row.as_ref());
253                        *offset = end_offset;
254                    }
255                }
256                false => {
257                    data[*offset] = null_sentinel;
258                    *offset += 1;
259                }
260            };
261        })
262}
263
264/// Decodes a fixed size list array from `rows` with the provided `options`
265///
266/// # Safety
267///
268/// `rows` must contain valid data for the provided `converter`
269pub unsafe fn decode_fixed_size_list(
270    converter: &RowConverter,
271    rows: &mut [&[u8]],
272    field: &SortField,
273    validate_utf8: bool,
274    value_length: usize,
275) -> Result<FixedSizeListArray, ArrowError> {
276    let list_type = &field.data_type;
277    let element_type = match list_type {
278        DataType::FixedSizeList(element_field, _) => element_field.data_type(),
279        _ => {
280            return Err(ArrowError::InvalidArgumentError(format!(
281                "Expected FixedSizeListArray, found: {list_type:?}",
282            )))
283        }
284    };
285
286    let len = rows.len();
287    let (null_count, nulls) = fixed::decode_nulls(rows);
288
289    let null_element_encoded = converter.convert_columns(&[new_null_array(element_type, 1)])?;
290    let null_element_encoded = null_element_encoded.row(0);
291    let null_element_slice = null_element_encoded.as_ref();
292
293    let mut child_rows = Vec::new();
294    for row in rows {
295        let valid = row[0] == 1;
296        let mut row_offset = 1;
297        if !valid {
298            for _ in 0..value_length {
299                child_rows.push(null_element_slice);
300            }
301        } else {
302            for _ in 0..value_length {
303                let mut temp_child_rows = vec![&row[row_offset..]];
304                converter.convert_raw(&mut temp_child_rows, validate_utf8)?;
305                let decoded_bytes = row.len() - row_offset - temp_child_rows[0].len();
306                let next_offset = row_offset + decoded_bytes;
307                child_rows.push(&row[row_offset..next_offset]);
308                row_offset = next_offset;
309            }
310        }
311        *row = &row[row_offset..]; // Update row for the next decoder
312    }
313
314    let children = converter.convert_raw(&mut child_rows, validate_utf8)?;
315    let child_data = children.iter().map(|c| c.to_data()).collect();
316    let builder = ArrayDataBuilder::new(list_type.clone())
317        .len(len)
318        .null_count(null_count)
319        .null_bit_buffer(Some(nulls))
320        .child_data(child_data);
321
322    Ok(FixedSizeListArray::from(builder.build_unchecked()))
323}