1use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be};
20use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
21use crate::arrow::record_reader::buffer::ValuesBuffer;
22use crate::arrow::record_reader::GenericRecordReader;
23use crate::arrow::schema::parquet_to_arrow_field;
24use crate::basic::{Encoding, Type};
25use crate::column::page::PageIterator;
26use crate::column::reader::decoder::ColumnValueDecoder;
27use crate::errors::{ParquetError, Result};
28use crate::schema::types::ColumnDescPtr;
29use arrow_array::{
30 ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
31 IntervalDayTimeArray, IntervalYearMonthArray,
32};
33use arrow_buffer::{i256, Buffer, IntervalDayTime};
34use arrow_data::ArrayDataBuilder;
35use arrow_schema::{DataType as ArrowType, IntervalUnit};
36use bytes::Bytes;
37use half::f16;
38use std::any::Any;
39use std::ops::Range;
40use std::sync::Arc;
41
42pub fn make_fixed_len_byte_array_reader(
44 pages: Box<dyn PageIterator>,
45 column_desc: ColumnDescPtr,
46 arrow_type: Option<ArrowType>,
47) -> Result<Box<dyn ArrayReader>> {
48 let data_type = match arrow_type {
50 Some(t) => t,
51 None => parquet_to_arrow_field(column_desc.as_ref())?
52 .data_type()
53 .clone(),
54 };
55
56 let byte_length = match column_desc.physical_type() {
57 Type::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize,
58 t => {
59 return Err(general_err!(
60 "invalid physical type for fixed length byte array reader - {}",
61 t
62 ))
63 }
64 };
65 match &data_type {
66 ArrowType::FixedSizeBinary(_) => {}
67 ArrowType::Decimal128(_, _) => {
68 if byte_length > 16 {
69 return Err(general_err!(
70 "decimal 128 type too large, must be less than 16 bytes, got {}",
71 byte_length
72 ));
73 }
74 }
75 ArrowType::Decimal256(_, _) => {
76 if byte_length > 32 {
77 return Err(general_err!(
78 "decimal 256 type too large, must be less than 32 bytes, got {}",
79 byte_length
80 ));
81 }
82 }
83 ArrowType::Interval(_) => {
84 if byte_length != 12 {
85 return Err(general_err!(
87 "interval type must consist of 12 bytes got {}",
88 byte_length
89 ));
90 }
91 }
92 ArrowType::Float16 => {
93 if byte_length != 2 {
94 return Err(general_err!(
95 "float 16 type must be 2 bytes, got {}",
96 byte_length
97 ));
98 }
99 }
100 _ => {
101 return Err(general_err!(
102 "invalid data type for fixed length byte array reader - {}",
103 data_type
104 ))
105 }
106 }
107
108 Ok(Box::new(FixedLenByteArrayReader::new(
109 pages,
110 column_desc,
111 data_type,
112 byte_length,
113 )))
114}
115
116struct FixedLenByteArrayReader {
117 data_type: ArrowType,
118 byte_length: usize,
119 pages: Box<dyn PageIterator>,
120 def_levels_buffer: Option<Vec<i16>>,
121 rep_levels_buffer: Option<Vec<i16>>,
122 record_reader: GenericRecordReader<FixedLenByteArrayBuffer, ValueDecoder>,
123}
124
125impl FixedLenByteArrayReader {
126 fn new(
127 pages: Box<dyn PageIterator>,
128 column_desc: ColumnDescPtr,
129 data_type: ArrowType,
130 byte_length: usize,
131 ) -> Self {
132 Self {
133 data_type,
134 byte_length,
135 pages,
136 def_levels_buffer: None,
137 rep_levels_buffer: None,
138 record_reader: GenericRecordReader::new(column_desc),
139 }
140 }
141}
142
143impl ArrayReader for FixedLenByteArrayReader {
144 fn as_any(&self) -> &dyn Any {
145 self
146 }
147
148 fn get_data_type(&self) -> &ArrowType {
149 &self.data_type
150 }
151
152 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
153 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
154 }
155
156 fn consume_batch(&mut self) -> Result<ArrayRef> {
157 let record_data = self.record_reader.consume_record_data();
158
159 let array_data = ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32))
160 .len(self.record_reader.num_values())
161 .add_buffer(Buffer::from_vec(record_data.buffer))
162 .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
163
164 let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() });
165
166 let array: ArrayRef = match &self.data_type {
171 ArrowType::Decimal128(p, s) => {
172 let f = |b: &[u8]| i128::from_be_bytes(sign_extend_be(b));
173 Arc::new(Decimal128Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
174 as ArrayRef
175 }
176 ArrowType::Decimal256(p, s) => {
177 let f = |b: &[u8]| i256::from_be_bytes(sign_extend_be(b));
178 Arc::new(Decimal256Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
179 as ArrayRef
180 }
181 ArrowType::Interval(unit) => {
182 match unit {
185 IntervalUnit::YearMonth => {
186 let f = |b: &[u8]| i32::from_le_bytes(b[0..4].try_into().unwrap());
187 Arc::new(IntervalYearMonthArray::from_unary(&binary, f)) as ArrayRef
188 }
189 IntervalUnit::DayTime => {
190 let f = |b: &[u8]| {
191 IntervalDayTime::new(
192 i32::from_le_bytes(b[4..8].try_into().unwrap()),
193 i32::from_le_bytes(b[8..12].try_into().unwrap()),
194 )
195 };
196 Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) as ArrayRef
197 }
198 IntervalUnit::MonthDayNano => {
199 return Err(nyi_err!("MonthDayNano intervals not supported"));
200 }
201 }
202 }
203 ArrowType::Float16 => {
204 let f = |b: &[u8]| f16::from_le_bytes(b[..2].try_into().unwrap());
205 Arc::new(Float16Array::from_unary(&binary, f)) as ArrayRef
206 }
207 _ => Arc::new(binary) as ArrayRef,
208 };
209
210 self.def_levels_buffer = self.record_reader.consume_def_levels();
211 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
212 self.record_reader.reset();
213
214 Ok(array)
215 }
216
217 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
218 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
219 }
220
221 fn get_def_levels(&self) -> Option<&[i16]> {
222 self.def_levels_buffer.as_deref()
223 }
224
225 fn get_rep_levels(&self) -> Option<&[i16]> {
226 self.rep_levels_buffer.as_deref()
227 }
228}
229
230#[derive(Default)]
231struct FixedLenByteArrayBuffer {
232 buffer: Vec<u8>,
233 byte_length: Option<usize>,
235}
236
237#[inline]
238fn move_values<F>(
239 buffer: &mut Vec<u8>,
240 byte_length: usize,
241 values_range: Range<usize>,
242 valid_mask: &[u8],
243 mut op: F,
244) where
245 F: FnMut(&mut Vec<u8>, usize, usize, usize),
246{
247 for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
248 debug_assert!(level_pos >= value_pos);
249 if level_pos <= value_pos {
250 break;
251 }
252
253 let level_pos_bytes = level_pos * byte_length;
254 let value_pos_bytes = value_pos * byte_length;
255
256 op(buffer, level_pos_bytes, value_pos_bytes, byte_length)
257 }
258}
259
260impl ValuesBuffer for FixedLenByteArrayBuffer {
261 fn pad_nulls(
262 &mut self,
263 read_offset: usize,
264 values_read: usize,
265 levels_read: usize,
266 valid_mask: &[u8],
267 ) {
268 let byte_length = self.byte_length.unwrap_or_default();
269
270 assert_eq!(self.buffer.len(), (read_offset + values_read) * byte_length);
271 self.buffer
272 .resize((read_offset + levels_read) * byte_length, 0);
273
274 let values_range = read_offset..read_offset + values_read;
275 const VEC_CUTOFF: usize = 4;
280 if byte_length > VEC_CUTOFF {
281 let op = |buffer: &mut Vec<u8>, level_pos_bytes, value_pos_bytes, byte_length| {
282 let split = buffer.split_at_mut(level_pos_bytes);
283 let dst = &mut split.1[..byte_length];
284 let src = &split.0[value_pos_bytes..value_pos_bytes + byte_length];
285 dst.copy_from_slice(src);
286 };
287 move_values(&mut self.buffer, byte_length, values_range, valid_mask, op);
288 } else {
289 let op = |buffer: &mut Vec<u8>, level_pos_bytes, value_pos_bytes, byte_length| {
290 for i in 0..byte_length {
291 buffer[level_pos_bytes + i] = buffer[value_pos_bytes + i]
292 }
293 };
294 move_values(&mut self.buffer, byte_length, values_range, valid_mask, op);
295 }
296 }
297}
298
299struct ValueDecoder {
300 byte_length: usize,
301 dict_page: Option<Bytes>,
302 decoder: Option<Decoder>,
303}
304
305impl ColumnValueDecoder for ValueDecoder {
306 type Buffer = FixedLenByteArrayBuffer;
307
308 fn new(col: &ColumnDescPtr) -> Self {
309 Self {
310 byte_length: col.type_length() as usize,
311 dict_page: None,
312 decoder: None,
313 }
314 }
315
316 fn set_dict(
317 &mut self,
318 buf: Bytes,
319 num_values: u32,
320 encoding: Encoding,
321 _is_sorted: bool,
322 ) -> Result<()> {
323 if !matches!(
324 encoding,
325 Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
326 ) {
327 return Err(nyi_err!(
328 "Invalid/Unsupported encoding type for dictionary: {}",
329 encoding
330 ));
331 }
332 let expected_len = num_values as usize * self.byte_length;
333 if expected_len > buf.len() {
334 return Err(general_err!(
335 "too few bytes in dictionary page, expected {} got {}",
336 expected_len,
337 buf.len()
338 ));
339 }
340
341 self.dict_page = Some(buf);
342 Ok(())
343 }
344
345 fn set_data(
346 &mut self,
347 encoding: Encoding,
348 data: Bytes,
349 num_levels: usize,
350 num_values: Option<usize>,
351 ) -> Result<()> {
352 self.decoder = Some(match encoding {
353 Encoding::PLAIN => Decoder::Plain {
354 buf: data,
355 offset: 0,
356 },
357 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Decoder::Dict {
358 decoder: DictIndexDecoder::new(data, num_levels, num_values),
359 },
360 Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
361 decoder: DeltaByteArrayDecoder::new(data)?,
362 },
363 Encoding::BYTE_STREAM_SPLIT => Decoder::ByteStreamSplit {
364 buf: data,
365 offset: 0,
366 },
367 _ => {
368 return Err(general_err!(
369 "unsupported encoding for fixed length byte array: {}",
370 encoding
371 ))
372 }
373 });
374 Ok(())
375 }
376
377 fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
378 match out.byte_length {
379 Some(x) => assert_eq!(x, self.byte_length),
380 None => out.byte_length = Some(self.byte_length),
381 }
382
383 match self.decoder.as_mut().unwrap() {
384 Decoder::Plain { offset, buf } => {
385 let to_read =
386 (num_values * self.byte_length).min(buf.len() - *offset) / self.byte_length;
387 let end_offset = *offset + to_read * self.byte_length;
388 out.buffer
389 .extend_from_slice(&buf.as_ref()[*offset..end_offset]);
390 *offset = end_offset;
391 Ok(to_read)
392 }
393 Decoder::Dict { decoder } => {
394 let dict = self.dict_page.as_ref().unwrap();
395 if dict.is_empty() {
397 return Ok(0);
398 }
399
400 decoder.read(num_values, |keys| {
401 out.buffer.reserve(keys.len() * self.byte_length);
402 for key in keys {
403 let offset = *key as usize * self.byte_length;
404 let val = &dict.as_ref()[offset..offset + self.byte_length];
405 out.buffer.extend_from_slice(val);
406 }
407 Ok(())
408 })
409 }
410 Decoder::Delta { decoder } => {
411 let to_read = num_values.min(decoder.remaining());
412 out.buffer.reserve(to_read * self.byte_length);
413
414 decoder.read(to_read, |slice| {
415 if slice.len() != self.byte_length {
416 return Err(general_err!(
417 "encountered array with incorrect length, got {} expected {}",
418 slice.len(),
419 self.byte_length
420 ));
421 }
422 out.buffer.extend_from_slice(slice);
423 Ok(())
424 })
425 }
426 Decoder::ByteStreamSplit { buf, offset } => {
427 let total_values = buf.len() / self.byte_length;
431 let to_read = num_values.min(total_values - *offset);
432
433 read_byte_stream_split(&mut out.buffer, buf, *offset, to_read, self.byte_length);
435
436 *offset += to_read;
437 Ok(to_read)
438 }
439 }
440 }
441
442 fn skip_values(&mut self, num_values: usize) -> Result<usize> {
443 match self.decoder.as_mut().unwrap() {
444 Decoder::Plain { offset, buf } => {
445 let to_read = num_values.min((buf.len() - *offset) / self.byte_length);
446 *offset += to_read * self.byte_length;
447 Ok(to_read)
448 }
449 Decoder::Dict { decoder } => decoder.skip(num_values),
450 Decoder::Delta { decoder } => decoder.skip(num_values),
451 Decoder::ByteStreamSplit { offset, buf } => {
452 let total_values = buf.len() / self.byte_length;
453 let to_read = num_values.min(total_values - *offset);
454 *offset += to_read;
455 Ok(to_read)
456 }
457 }
458 }
459}
460
461fn read_byte_stream_split(
467 dst: &mut Vec<u8>,
468 src: &mut Bytes,
469 offset: usize,
470 num_values: usize,
471 data_width: usize,
472) {
473 let stride = src.len() / data_width;
474 let idx = dst.len();
475 dst.resize(idx + num_values * data_width, 0u8);
476 let dst_slc = &mut dst[idx..idx + num_values * data_width];
477 for j in 0..data_width {
478 let src_slc = &src[offset + j * stride..offset + j * stride + num_values];
479 for i in 0..num_values {
480 dst_slc[i * data_width + j] = src_slc[i];
481 }
482 }
483}
484
485enum Decoder {
486 Plain { buf: Bytes, offset: usize },
487 Dict { decoder: DictIndexDecoder },
488 Delta { decoder: DeltaByteArrayDecoder },
489 ByteStreamSplit { buf: Bytes, offset: usize },
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495 use crate::arrow::arrow_reader::ParquetRecordBatchReader;
496 use crate::arrow::ArrowWriter;
497 use arrow::datatypes::Field;
498 use arrow::error::Result as ArrowResult;
499 use arrow_array::{Array, ListArray};
500 use arrow_array::{Decimal256Array, RecordBatch};
501 use bytes::Bytes;
502 use std::sync::Arc;
503
504 #[test]
505 fn test_decimal_list() {
506 let decimals = Decimal256Array::from_iter_values(
507 [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
508 );
509
510 let data = ArrayDataBuilder::new(ArrowType::List(Arc::new(Field::new_list_field(
512 decimals.data_type().clone(),
513 false,
514 ))))
515 .len(7)
516 .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
517 .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
518 .child_data(vec![decimals.into_data()])
519 .build()
520 .unwrap();
521
522 let written =
523 RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
524 .unwrap();
525
526 let mut buffer = Vec::with_capacity(1024);
527 let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
528 writer.write(&written).unwrap();
529 writer.close().unwrap();
530
531 let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
532 .unwrap()
533 .collect::<ArrowResult<Vec<_>>>()
534 .unwrap();
535
536 assert_eq!(&written.slice(0, 3), &read[0]);
537 assert_eq!(&written.slice(3, 3), &read[1]);
538 assert_eq!(&written.slice(6, 1), &read[2]);
539 }
540}