1use crate::{null_sentinel, RowConverter, Rows, SortField};
19use arrow_array::{Array, GenericListArray, OffsetSizeTrait};
20use arrow_buffer::{Buffer, MutableBuffer};
21use arrow_data::ArrayDataBuilder;
22use arrow_schema::{ArrowError, SortOptions};
23use std::ops::Range;
24
25pub fn compute_lengths<O: OffsetSizeTrait>(
26 lengths: &mut [usize],
27 rows: &Rows,
28 array: &GenericListArray<O>,
29) {
30 let offsets = array.value_offsets().windows(2);
31 lengths
32 .iter_mut()
33 .zip(offsets)
34 .enumerate()
35 .for_each(|(idx, (length, offsets))| {
36 let start = offsets[0].as_usize();
37 let end = offsets[1].as_usize();
38 let range = array.is_valid(idx).then_some(start..end);
39 *length += encoded_len(rows, range);
40 });
41}
42
43fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
44 match range {
45 None => 1,
46 Some(range) => {
47 1 + range
48 .map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len())))
49 .sum::<usize>()
50 }
51 }
52}
53
54pub fn encode<O: OffsetSizeTrait>(
58 data: &mut [u8],
59 offsets: &mut [usize],
60 rows: &Rows,
61 opts: SortOptions,
62 array: &GenericListArray<O>,
63) {
64 offsets
65 .iter_mut()
66 .skip(1)
67 .zip(array.value_offsets().windows(2))
68 .enumerate()
69 .for_each(|(idx, (offset, offsets))| {
70 let start = offsets[0].as_usize();
71 let end = offsets[1].as_usize();
72 let range = array.is_valid(idx).then_some(start..end);
73 let out = &mut data[*offset..];
74 *offset += encode_one(out, rows, range, opts)
75 });
76}
77
78#[inline]
79fn encode_one(
80 out: &mut [u8],
81 rows: &Rows,
82 range: Option<Range<usize>>,
83 opts: SortOptions,
84) -> usize {
85 match range {
86 None => super::variable::encode_null(out, opts),
87 Some(range) if range.start == range.end => super::variable::encode_empty(out, opts),
88 Some(range) => {
89 let mut offset = 0;
90 for i in range {
91 let row = rows.row(i);
92 offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts);
93 }
94 offset += super::variable::encode_empty(&mut out[offset..], opts);
95 offset
96 }
97 }
98}
99
100pub unsafe fn decode<O: OffsetSizeTrait>(
106 converter: &RowConverter,
107 rows: &mut [&[u8]],
108 field: &SortField,
109 validate_utf8: bool,
110) -> Result<GenericListArray<O>, ArrowError> {
111 let opts = field.options;
112
113 let mut values_bytes = 0;
114
115 let mut offset = 0;
116 let mut offsets = Vec::with_capacity(rows.len() + 1);
117 offsets.push(O::usize_as(0));
118
119 for row in rows.iter_mut() {
120 let mut row_offset = 0;
121 loop {
122 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
123 values_bytes += x.len();
124 });
125 if decoded <= 1 {
126 offsets.push(O::usize_as(offset));
127 break;
128 }
129 row_offset += decoded;
130 offset += 1;
131 }
132 }
133 O::from_usize(offset).expect("overflow");
134
135 let mut null_count = 0;
136 let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
137 let valid = rows[x][0] != null_sentinel(opts);
138 null_count += !valid as usize;
139 valid
140 });
141
142 let mut values_offsets = Vec::with_capacity(offset);
143 let mut values_bytes = Vec::with_capacity(values_bytes);
144 for row in rows.iter_mut() {
145 let mut row_offset = 0;
146 loop {
147 let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
148 values_bytes.extend_from_slice(x)
149 });
150 row_offset += decoded;
151 if decoded <= 1 {
152 break;
153 }
154 values_offsets.push(values_bytes.len());
155 }
156 *row = &row[row_offset..];
157 }
158
159 if opts.descending {
160 values_bytes.iter_mut().for_each(|o| *o = !*o);
161 }
162
163 let mut last_value_offset = 0;
164 let mut child_rows: Vec<_> = values_offsets
165 .into_iter()
166 .map(|offset| {
167 let v = &values_bytes[last_value_offset..offset];
168 last_value_offset = offset;
169 v
170 })
171 .collect();
172
173 let child = converter.convert_raw(&mut child_rows, validate_utf8)?;
174 assert_eq!(child.len(), 1);
175
176 let child_data = child[0].to_data();
177
178 let builder = ArrayDataBuilder::new(field.data_type.clone())
179 .len(rows.len())
180 .null_count(null_count)
181 .null_bit_buffer(Some(nulls.into()))
182 .add_buffer(Buffer::from_vec(offsets))
183 .add_child_data(child_data);
184
185 Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))
186}