arrow_row/
list.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{fixed, null_sentinel, LengthTracker, RowConverter, Rows, SortField};
19use arrow_array::{new_null_array, Array, FixedSizeListArray, GenericListArray, OffsetSizeTrait};
20use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
21use arrow_data::ArrayDataBuilder;
22use arrow_schema::{ArrowError, DataType, SortOptions};
23use std::ops::Range;
24
25pub fn compute_lengths<O: OffsetSizeTrait>(
26    lengths: &mut [usize],
27    rows: &Rows,
28    array: &GenericListArray<O>,
29) {
30    let shift = array.value_offsets()[0].as_usize();
31
32    let offsets = array.value_offsets().windows(2);
33    lengths
34        .iter_mut()
35        .zip(offsets)
36        .enumerate()
37        .for_each(|(idx, (length, offsets))| {
38            let start = offsets[0].as_usize() - shift;
39            let end = offsets[1].as_usize() - shift;
40            let range = array.is_valid(idx).then_some(start..end);
41            *length += encoded_len(rows, range);
42        });
43}
44
45fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
46    match range {
47        None => 1,
48        Some(range) => {
49            1 + range
50                .map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len())))
51                .sum::<usize>()
52        }
53    }
54}
55
56/// Encodes the provided `GenericListArray` to `out` with the provided `SortOptions`
57///
58/// `rows` should contain the encoded child elements
59pub fn encode<O: OffsetSizeTrait>(
60    data: &mut [u8],
61    offsets: &mut [usize],
62    rows: &Rows,
63    opts: SortOptions,
64    array: &GenericListArray<O>,
65) {
66    let shift = array.value_offsets()[0].as_usize();
67
68    offsets
69        .iter_mut()
70        .skip(1)
71        .zip(array.value_offsets().windows(2))
72        .enumerate()
73        .for_each(|(idx, (offset, offsets))| {
74            let start = offsets[0].as_usize() - shift;
75            let end = offsets[1].as_usize() - shift;
76            let range = array.is_valid(idx).then_some(start..end);
77            let out = &mut data[*offset..];
78            *offset += encode_one(out, rows, range, opts)
79        });
80}
81
82#[inline]
83fn encode_one(
84    out: &mut [u8],
85    rows: &Rows,
86    range: Option<Range<usize>>,
87    opts: SortOptions,
88) -> usize {
89    match range {
90        None => super::variable::encode_null(out, opts),
91        Some(range) if range.start == range.end => super::variable::encode_empty(out, opts),
92        Some(range) => {
93            let mut offset = 0;
94            for i in range {
95                let row = rows.row(i);
96                offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts);
97            }
98            offset += super::variable::encode_empty(&mut out[offset..], opts);
99            offset
100        }
101    }
102}
103
104/// Decodes an array from `rows` with the provided `options`
105///
106/// # Safety
107///
108/// `rows` must contain valid data for the provided `converter`
109pub unsafe fn decode<O: OffsetSizeTrait>(
110    converter: &RowConverter,
111    rows: &mut [&[u8]],
112    field: &SortField,
113    validate_utf8: bool,
114) -> Result<GenericListArray<O>, ArrowError> {
115    let opts = field.options;
116
117    let mut values_bytes = 0;
118
119    let mut offset = 0;
120    let mut offsets = Vec::with_capacity(rows.len() + 1);
121    offsets.push(O::usize_as(0));
122
123    for row in rows.iter_mut() {
124        let mut row_offset = 0;
125        loop {
126            let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
127                values_bytes += x.len();
128            });
129            if decoded <= 1 {
130                offsets.push(O::usize_as(offset));
131                break;
132            }
133            row_offset += decoded;
134            offset += 1;
135        }
136    }
137    O::from_usize(offset).expect("overflow");
138
139    let mut null_count = 0;
140    let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
141        let valid = rows[x][0] != null_sentinel(opts);
142        null_count += !valid as usize;
143        valid
144    });
145
146    let mut values_offsets = Vec::with_capacity(offset);
147    let mut values_bytes = Vec::with_capacity(values_bytes);
148    for row in rows.iter_mut() {
149        let mut row_offset = 0;
150        loop {
151            let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
152                values_bytes.extend_from_slice(x)
153            });
154            row_offset += decoded;
155            if decoded <= 1 {
156                break;
157            }
158            values_offsets.push(values_bytes.len());
159        }
160        *row = &row[row_offset..];
161    }
162
163    if opts.descending {
164        values_bytes.iter_mut().for_each(|o| *o = !*o);
165    }
166
167    let mut last_value_offset = 0;
168    let mut child_rows: Vec<_> = values_offsets
169        .into_iter()
170        .map(|offset| {
171            let v = &values_bytes[last_value_offset..offset];
172            last_value_offset = offset;
173            v
174        })
175        .collect();
176
177    let child = converter.convert_raw(&mut child_rows, validate_utf8)?;
178    assert_eq!(child.len(), 1);
179
180    let child_data = child[0].to_data();
181
182    let builder = ArrayDataBuilder::new(field.data_type.clone())
183        .len(rows.len())
184        .null_count(null_count)
185        .null_bit_buffer(Some(nulls.into()))
186        .add_buffer(Buffer::from_vec(offsets))
187        .add_child_data(child_data);
188
189    Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))
190}
191
192pub fn compute_lengths_fixed_size_list(
193    tracker: &mut LengthTracker,
194    rows: &Rows,
195    array: &FixedSizeListArray,
196) {
197    let value_length = array.value_length().as_usize();
198    tracker.push_variable((0..array.len()).map(|idx| {
199        match array.is_valid(idx) {
200            true => {
201                1 + ((idx * value_length)..(idx + 1) * value_length)
202                    .map(|child_idx| rows.row(child_idx).as_ref().len())
203                    .sum::<usize>()
204            }
205            false => 1,
206        }
207    }))
208}
209
210/// Encodes the provided `FixedSizeListArray` to `out` with the provided `SortOptions`
211///
212/// `rows` should contain the encoded child elements
213pub fn encode_fixed_size_list(
214    data: &mut [u8],
215    offsets: &mut [usize],
216    rows: &Rows,
217    opts: SortOptions,
218    array: &FixedSizeListArray,
219) {
220    let null_sentinel = null_sentinel(opts);
221    offsets
222        .iter_mut()
223        .skip(1)
224        .enumerate()
225        .for_each(|(idx, offset)| {
226            let value_length = array.value_length().as_usize();
227            match array.is_valid(idx) {
228                true => {
229                    data[*offset] = 0x01;
230                    *offset += 1;
231                    for child_idx in (idx * value_length)..(idx + 1) * value_length {
232                        let row = rows.row(child_idx);
233                        let end_offset = *offset + row.as_ref().len();
234                        data[*offset..end_offset].copy_from_slice(row.as_ref());
235                        *offset = end_offset;
236                    }
237                }
238                false => {
239                    data[*offset] = null_sentinel;
240                    *offset += 1;
241                }
242            };
243        })
244}
245
246/// Decodes a fixed size list array from `rows` with the provided `options`
247///
248/// # Safety
249///
250/// `rows` must contain valid data for the provided `converter`
251pub unsafe fn decode_fixed_size_list(
252    converter: &RowConverter,
253    rows: &mut [&[u8]],
254    field: &SortField,
255    validate_utf8: bool,
256    value_length: usize,
257) -> Result<FixedSizeListArray, ArrowError> {
258    let list_type = &field.data_type;
259    let element_type = match list_type {
260        DataType::FixedSizeList(element_field, _) => element_field.data_type(),
261        _ => {
262            return Err(ArrowError::InvalidArgumentError(format!(
263                "Expected FixedSizeListArray, found: {list_type:?}",
264            )))
265        }
266    };
267
268    let len = rows.len();
269    let (null_count, nulls) = fixed::decode_nulls(rows);
270
271    let null_element_encoded = converter.convert_columns(&[new_null_array(element_type, 1)])?;
272    let null_element_encoded = null_element_encoded.row(0);
273    let null_element_slice = null_element_encoded.as_ref();
274
275    let mut child_rows = Vec::new();
276    for row in rows {
277        let valid = row[0] == 1;
278        let mut row_offset = 1;
279        if !valid {
280            for _ in 0..value_length {
281                child_rows.push(null_element_slice);
282            }
283        } else {
284            for _ in 0..value_length {
285                let mut temp_child_rows = vec![&row[row_offset..]];
286                converter.convert_raw(&mut temp_child_rows, validate_utf8)?;
287                let decoded_bytes = row.len() - row_offset - temp_child_rows[0].len();
288                let next_offset = row_offset + decoded_bytes;
289                child_rows.push(&row[row_offset..next_offset]);
290                row_offset = next_offset;
291            }
292        }
293        *row = &row[row_offset..]; // Update row for the next decoder
294    }
295
296    let children = converter.convert_raw(&mut child_rows, validate_utf8)?;
297    let child_data = children.iter().map(|c| c.to_data()).collect();
298    let builder = ArrayDataBuilder::new(list_type.clone())
299        .len(len)
300        .null_count(null_count)
301        .null_bit_buffer(Some(nulls))
302        .child_data(child_data);
303
304    Ok(FixedSizeListArray::from(builder.build_unchecked()))
305}