1use crate::{fixed, null_sentinel, LengthTracker, RowConverter, Rows, SortField};
19use arrow_array::{new_null_array, Array, FixedSizeListArray, GenericListArray, OffsetSizeTrait};
20use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
21use arrow_data::ArrayDataBuilder;
22use arrow_schema::{ArrowError, DataType, SortOptions};
23use std::ops::Range;
24
25pub fn compute_lengths<O: OffsetSizeTrait>(
26 lengths: &mut [usize],
27 rows: &Rows,
28 array: &GenericListArray<O>,
29) {
30 let offsets = array.value_offsets().windows(2);
31 lengths
32 .iter_mut()
33 .zip(offsets)
34 .enumerate()
35 .for_each(|(idx, (length, offsets))| {
36 let start = offsets[0].as_usize();
37 let end = offsets[1].as_usize();
38 let range = array.is_valid(idx).then_some(start..end);
39 *length += encoded_len(rows, range);
40 });
41}
42
43fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
44 match range {
45 None => 1,
46 Some(range) => {
47 1 + range
48 .map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len())))
49 .sum::<usize>()
50 }
51 }
52}
53
54pub fn encode<O: OffsetSizeTrait>(
58 data: &mut [u8],
59 offsets: &mut [usize],
60 rows: &Rows,
61 opts: SortOptions,
62 array: &GenericListArray<O>,
63) {
64 offsets
65 .iter_mut()
66 .skip(1)
67 .zip(array.value_offsets().windows(2))
68 .enumerate()
69 .for_each(|(idx, (offset, offsets))| {
70 let start = offsets[0].as_usize();
71 let end = offsets[1].as_usize();
72 let range = array.is_valid(idx).then_some(start..end);
73 let out = &mut data[*offset..];
74 *offset += encode_one(out, rows, range, opts)
75 });
76}
77
78#[inline]
79fn encode_one(
80 out: &mut [u8],
81 rows: &Rows,
82 range: Option<Range<usize>>,
83 opts: SortOptions,
84) -> usize {
85 match range {
86 None => super::variable::encode_null(out, opts),
87 Some(range) if range.start == range.end => super::variable::encode_empty(out, opts),
88 Some(range) => {
89 let mut offset = 0;
90 for i in range {
91 let row = rows.row(i);
92 offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts);
93 }
94 offset += super::variable::encode_empty(&mut out[offset..], opts);
95 offset
96 }
97 }
98}
99
100pub unsafe fn decode<O: OffsetSizeTrait>(
106 converter: &RowConverter,
107 rows: &mut [&[u8]],
108 field: &SortField,
109 validate_utf8: bool,
110) -> Result<GenericListArray<O>, ArrowError> {
111 let opts = field.options;
112
113 let mut values_bytes = 0;
114
115 let mut offset = 0;
116 let mut offsets = Vec::with_capacity(rows.len() + 1);
117 offsets.push(O::usize_as(0));
118
119 for row in rows.iter_mut() {
120 let mut row_offset = 0;
121 loop {
122 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
123 values_bytes += x.len();
124 });
125 if decoded <= 1 {
126 offsets.push(O::usize_as(offset));
127 break;
128 }
129 row_offset += decoded;
130 offset += 1;
131 }
132 }
133 O::from_usize(offset).expect("overflow");
134
135 let mut null_count = 0;
136 let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
137 let valid = rows[x][0] != null_sentinel(opts);
138 null_count += !valid as usize;
139 valid
140 });
141
142 let mut values_offsets = Vec::with_capacity(offset);
143 let mut values_bytes = Vec::with_capacity(values_bytes);
144 for row in rows.iter_mut() {
145 let mut row_offset = 0;
146 loop {
147 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
148 values_bytes.extend_from_slice(x)
149 });
150 row_offset += decoded;
151 if decoded <= 1 {
152 break;
153 }
154 values_offsets.push(values_bytes.len());
155 }
156 *row = &row[row_offset..];
157 }
158
159 if opts.descending {
160 values_bytes.iter_mut().for_each(|o| *o = !*o);
161 }
162
163 let mut last_value_offset = 0;
164 let mut child_rows: Vec<_> = values_offsets
165 .into_iter()
166 .map(|offset| {
167 let v = &values_bytes[last_value_offset..offset];
168 last_value_offset = offset;
169 v
170 })
171 .collect();
172
173 let child = converter.convert_raw(&mut child_rows, validate_utf8)?;
174 assert_eq!(child.len(), 1);
175
176 let child_data = child[0].to_data();
177
178 let builder = ArrayDataBuilder::new(field.data_type.clone())
179 .len(rows.len())
180 .null_count(null_count)
181 .null_bit_buffer(Some(nulls.into()))
182 .add_buffer(Buffer::from_vec(offsets))
183 .add_child_data(child_data);
184
185 Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))
186}
187
188pub fn compute_lengths_fixed_size_list(
189 tracker: &mut LengthTracker,
190 rows: &Rows,
191 array: &FixedSizeListArray,
192) {
193 let value_length = array.value_length().as_usize();
194 tracker.push_variable((0..array.len()).map(|idx| {
195 match array.is_valid(idx) {
196 true => {
197 1 + ((idx * value_length)..(idx + 1) * value_length)
198 .map(|child_idx| rows.row(child_idx).as_ref().len())
199 .sum::<usize>()
200 }
201 false => 1,
202 }
203 }))
204}
205
206pub fn encode_fixed_size_list(
210 data: &mut [u8],
211 offsets: &mut [usize],
212 rows: &Rows,
213 opts: SortOptions,
214 array: &FixedSizeListArray,
215) {
216 let null_sentinel = null_sentinel(opts);
217 offsets
218 .iter_mut()
219 .skip(1)
220 .enumerate()
221 .for_each(|(idx, offset)| {
222 let value_length = array.value_length().as_usize();
223 match array.is_valid(idx) {
224 true => {
225 data[*offset] = 0x01;
226 *offset += 1;
227 for child_idx in (idx * value_length)..(idx + 1) * value_length {
228 let row = rows.row(child_idx);
230 let end_offset = *offset + row.as_ref().len();
231 data[*offset..end_offset].copy_from_slice(row.as_ref());
232 *offset = end_offset;
233 }
234 }
235 false => {
236 let null_sentinels = 1;
237 for i in 0..null_sentinels {
239 data[*offset + i] = null_sentinel;
240 }
241 *offset += null_sentinels;
242 }
243 };
244 })
245}
246
247pub unsafe fn decode_fixed_size_list(
253 converter: &RowConverter,
254 rows: &mut [&[u8]],
255 field: &SortField,
256 validate_utf8: bool,
257 value_length: usize,
258) -> Result<FixedSizeListArray, ArrowError> {
259 let list_type = &field.data_type;
260 let element_type = match list_type {
261 DataType::FixedSizeList(element_field, _) => element_field.data_type(),
262 _ => {
263 return Err(ArrowError::InvalidArgumentError(format!(
264 "Expected FixedSizeListArray, found: {list_type:?}",
265 )))
266 }
267 };
268
269 let len = rows.len();
270 let (null_count, nulls) = fixed::decode_nulls(rows);
271
272 let null_element_encoded = converter.convert_columns(&[new_null_array(element_type, 1)])?;
273 let null_element_encoded = null_element_encoded.row(0);
274 let null_element_slice = null_element_encoded.as_ref();
275
276 let mut child_rows = Vec::new();
277 for row in rows {
278 let valid = row[0] == 1;
279 let mut row_offset = 1;
280 if !valid {
281 for _ in 0..value_length {
282 child_rows.push(null_element_slice);
283 }
284 } else {
285 for _ in 0..value_length {
286 let mut temp_child_rows = vec![&row[row_offset..]];
287 converter.convert_raw(&mut temp_child_rows, validate_utf8)?;
288 let decoded_bytes = row.len() - row_offset - temp_child_rows[0].len();
289 let next_offset = row_offset + decoded_bytes;
290 child_rows.push(&row[row_offset..next_offset]);
291 row_offset = next_offset;
292 }
293 }
294 }
295
296 let children = converter.convert_raw(&mut child_rows, validate_utf8)?;
297 let child_data = children.iter().map(|c| c.to_data()).collect();
298 let builder = ArrayDataBuilder::new(list_type.clone())
299 .len(len)
300 .null_count(null_count)
301 .null_bit_buffer(Some(nulls))
302 .child_data(child_data);
303
304 Ok(FixedSizeListArray::from(builder.build_unchecked()))
305}