1use crate::{fixed, null_sentinel, LengthTracker, RowConverter, Rows, SortField};
19use arrow_array::{new_null_array, Array, FixedSizeListArray, GenericListArray, OffsetSizeTrait};
20use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
21use arrow_data::ArrayDataBuilder;
22use arrow_schema::{ArrowError, DataType, SortOptions};
23use std::ops::Range;
24
25pub fn compute_lengths<O: OffsetSizeTrait>(
26 lengths: &mut [usize],
27 rows: &Rows,
28 array: &GenericListArray<O>,
29) {
30 let shift = array.value_offsets()[0].as_usize();
31
32 let offsets = array.value_offsets().windows(2);
33 lengths
34 .iter_mut()
35 .zip(offsets)
36 .enumerate()
37 .for_each(|(idx, (length, offsets))| {
38 let start = offsets[0].as_usize() - shift;
39 let end = offsets[1].as_usize() - shift;
40 let range = array.is_valid(idx).then_some(start..end);
41 *length += encoded_len(rows, range);
42 });
43}
44
45fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
46 match range {
47 None => 1,
48 Some(range) => {
49 1 + range
50 .map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len())))
51 .sum::<usize>()
52 }
53 }
54}
55
56pub fn encode<O: OffsetSizeTrait>(
60 data: &mut [u8],
61 offsets: &mut [usize],
62 rows: &Rows,
63 opts: SortOptions,
64 array: &GenericListArray<O>,
65) {
66 let shift = array.value_offsets()[0].as_usize();
67
68 offsets
69 .iter_mut()
70 .skip(1)
71 .zip(array.value_offsets().windows(2))
72 .enumerate()
73 .for_each(|(idx, (offset, offsets))| {
74 let start = offsets[0].as_usize() - shift;
75 let end = offsets[1].as_usize() - shift;
76 let range = array.is_valid(idx).then_some(start..end);
77 let out = &mut data[*offset..];
78 *offset += encode_one(out, rows, range, opts)
79 });
80}
81
82#[inline]
83fn encode_one(
84 out: &mut [u8],
85 rows: &Rows,
86 range: Option<Range<usize>>,
87 opts: SortOptions,
88) -> usize {
89 match range {
90 None => super::variable::encode_null(out, opts),
91 Some(range) if range.start == range.end => super::variable::encode_empty(out, opts),
92 Some(range) => {
93 let mut offset = 0;
94 for i in range {
95 let row = rows.row(i);
96 offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts);
97 }
98 offset += super::variable::encode_empty(&mut out[offset..], opts);
99 offset
100 }
101 }
102}
103
104pub unsafe fn decode<O: OffsetSizeTrait>(
110 converter: &RowConverter,
111 rows: &mut [&[u8]],
112 field: &SortField,
113 validate_utf8: bool,
114) -> Result<GenericListArray<O>, ArrowError> {
115 let opts = field.options;
116
117 let mut values_bytes = 0;
118
119 let mut offset = 0;
120 let mut offsets = Vec::with_capacity(rows.len() + 1);
121 offsets.push(O::usize_as(0));
122
123 for row in rows.iter_mut() {
124 let mut row_offset = 0;
125 loop {
126 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
127 values_bytes += x.len();
128 });
129 if decoded <= 1 {
130 offsets.push(O::usize_as(offset));
131 break;
132 }
133 row_offset += decoded;
134 offset += 1;
135 }
136 }
137 O::from_usize(offset).expect("overflow");
138
139 let mut null_count = 0;
140 let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
141 let valid = rows[x][0] != null_sentinel(opts);
142 null_count += !valid as usize;
143 valid
144 });
145
146 let mut values_offsets = Vec::with_capacity(offset);
147 let mut values_bytes = Vec::with_capacity(values_bytes);
148 for row in rows.iter_mut() {
149 let mut row_offset = 0;
150 loop {
151 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
152 values_bytes.extend_from_slice(x)
153 });
154 row_offset += decoded;
155 if decoded <= 1 {
156 break;
157 }
158 values_offsets.push(values_bytes.len());
159 }
160 *row = &row[row_offset..];
161 }
162
163 if opts.descending {
164 values_bytes.iter_mut().for_each(|o| *o = !*o);
165 }
166
167 let mut last_value_offset = 0;
168 let mut child_rows: Vec<_> = values_offsets
169 .into_iter()
170 .map(|offset| {
171 let v = &values_bytes[last_value_offset..offset];
172 last_value_offset = offset;
173 v
174 })
175 .collect();
176
177 let child = converter.convert_raw(&mut child_rows, validate_utf8)?;
178 assert_eq!(child.len(), 1);
179
180 let child_data = child[0].to_data();
181
182 let builder = ArrayDataBuilder::new(field.data_type.clone())
183 .len(rows.len())
184 .null_count(null_count)
185 .null_bit_buffer(Some(nulls.into()))
186 .add_buffer(Buffer::from_vec(offsets))
187 .add_child_data(child_data);
188
189 Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))
190}
191
192pub fn compute_lengths_fixed_size_list(
193 tracker: &mut LengthTracker,
194 rows: &Rows,
195 array: &FixedSizeListArray,
196) {
197 let value_length = array.value_length().as_usize();
198 tracker.push_variable((0..array.len()).map(|idx| {
199 match array.is_valid(idx) {
200 true => {
201 1 + ((idx * value_length)..(idx + 1) * value_length)
202 .map(|child_idx| rows.row(child_idx).as_ref().len())
203 .sum::<usize>()
204 }
205 false => 1,
206 }
207 }))
208}
209
210pub fn encode_fixed_size_list(
214 data: &mut [u8],
215 offsets: &mut [usize],
216 rows: &Rows,
217 opts: SortOptions,
218 array: &FixedSizeListArray,
219) {
220 let null_sentinel = null_sentinel(opts);
221 offsets
222 .iter_mut()
223 .skip(1)
224 .enumerate()
225 .for_each(|(idx, offset)| {
226 let value_length = array.value_length().as_usize();
227 match array.is_valid(idx) {
228 true => {
229 data[*offset] = 0x01;
230 *offset += 1;
231 for child_idx in (idx * value_length)..(idx + 1) * value_length {
232 let row = rows.row(child_idx);
233 let end_offset = *offset + row.as_ref().len();
234 data[*offset..end_offset].copy_from_slice(row.as_ref());
235 *offset = end_offset;
236 }
237 }
238 false => {
239 data[*offset] = null_sentinel;
240 *offset += 1;
241 }
242 };
243 })
244}
245
246pub unsafe fn decode_fixed_size_list(
252 converter: &RowConverter,
253 rows: &mut [&[u8]],
254 field: &SortField,
255 validate_utf8: bool,
256 value_length: usize,
257) -> Result<FixedSizeListArray, ArrowError> {
258 let list_type = &field.data_type;
259 let element_type = match list_type {
260 DataType::FixedSizeList(element_field, _) => element_field.data_type(),
261 _ => {
262 return Err(ArrowError::InvalidArgumentError(format!(
263 "Expected FixedSizeListArray, found: {list_type:?}",
264 )))
265 }
266 };
267
268 let len = rows.len();
269 let (null_count, nulls) = fixed::decode_nulls(rows);
270
271 let null_element_encoded = converter.convert_columns(&[new_null_array(element_type, 1)])?;
272 let null_element_encoded = null_element_encoded.row(0);
273 let null_element_slice = null_element_encoded.as_ref();
274
275 let mut child_rows = Vec::new();
276 for row in rows {
277 let valid = row[0] == 1;
278 let mut row_offset = 1;
279 if !valid {
280 for _ in 0..value_length {
281 child_rows.push(null_element_slice);
282 }
283 } else {
284 for _ in 0..value_length {
285 let mut temp_child_rows = vec![&row[row_offset..]];
286 converter.convert_raw(&mut temp_child_rows, validate_utf8)?;
287 let decoded_bytes = row.len() - row_offset - temp_child_rows[0].len();
288 let next_offset = row_offset + decoded_bytes;
289 child_rows.push(&row[row_offset..next_offset]);
290 row_offset = next_offset;
291 }
292 }
293 *row = &row[row_offset..]; }
295
296 let children = converter.convert_raw(&mut child_rows, validate_utf8)?;
297 let child_data = children.iter().map(|c| c.to_data()).collect();
298 let builder = ArrayDataBuilder::new(list_type.clone())
299 .len(len)
300 .null_count(null_count)
301 .null_bit_buffer(Some(nulls))
302 .child_data(child_data);
303
304 Ok(FixedSizeListArray::from(builder.build_unchecked()))
305}