1use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::buffer::bit_util::sign_extend_be;
20use crate::arrow::buffer::offset_buffer::OffsetBuffer;
21use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
22use crate::arrow::record_reader::GenericRecordReader;
23use crate::arrow::schema::parquet_to_arrow_field;
24use crate::basic::{ConvertedType, Encoding};
25use crate::column::page::PageIterator;
26use crate::column::reader::decoder::ColumnValueDecoder;
27use crate::data_type::Int32Type;
28use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
29use crate::errors::{ParquetError, Result};
30use crate::schema::types::ColumnDescPtr;
31use arrow_array::{
32 Array, ArrayRef, BinaryArray, Decimal128Array, Decimal256Array, OffsetSizeTrait,
33};
34use arrow_buffer::i256;
35use arrow_schema::DataType as ArrowType;
36use bytes::Bytes;
37use std::any::Any;
38use std::sync::Arc;
39
40pub fn make_byte_array_reader(
42 pages: Box<dyn PageIterator>,
43 column_desc: ColumnDescPtr,
44 arrow_type: Option<ArrowType>,
45) -> Result<Box<dyn ArrayReader>> {
46 let data_type = match arrow_type {
48 Some(t) => t,
49 None => parquet_to_arrow_field(column_desc.as_ref())?
50 .data_type()
51 .clone(),
52 };
53
54 match data_type {
55 ArrowType::Binary
56 | ArrowType::Utf8
57 | ArrowType::Decimal128(_, _)
58 | ArrowType::Decimal256(_, _) => {
59 let reader = GenericRecordReader::new(column_desc);
60 Ok(Box::new(ByteArrayReader::<i32>::new(
61 pages, data_type, reader,
62 )))
63 }
64 ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
65 let reader = GenericRecordReader::new(column_desc);
66 Ok(Box::new(ByteArrayReader::<i64>::new(
67 pages, data_type, reader,
68 )))
69 }
70 _ => Err(general_err!(
71 "invalid data type for byte array reader - {}",
72 data_type
73 )),
74 }
75}
76
77struct ByteArrayReader<I: OffsetSizeTrait> {
79 data_type: ArrowType,
80 pages: Box<dyn PageIterator>,
81 def_levels_buffer: Option<Vec<i16>>,
82 rep_levels_buffer: Option<Vec<i16>>,
83 record_reader: GenericRecordReader<OffsetBuffer<I>, ByteArrayColumnValueDecoder<I>>,
84}
85
86impl<I: OffsetSizeTrait> ByteArrayReader<I> {
87 fn new(
88 pages: Box<dyn PageIterator>,
89 data_type: ArrowType,
90 record_reader: GenericRecordReader<OffsetBuffer<I>, ByteArrayColumnValueDecoder<I>>,
91 ) -> Self {
92 Self {
93 data_type,
94 pages,
95 def_levels_buffer: None,
96 rep_levels_buffer: None,
97 record_reader,
98 }
99 }
100}
101
102impl<I: OffsetSizeTrait> ArrayReader for ByteArrayReader<I> {
103 fn as_any(&self) -> &dyn Any {
104 self
105 }
106
107 fn get_data_type(&self) -> &ArrowType {
108 &self.data_type
109 }
110
111 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
112 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
113 }
114
115 fn consume_batch(&mut self) -> Result<ArrayRef> {
116 let buffer = self.record_reader.consume_record_data();
117 let null_buffer = self.record_reader.consume_bitmap_buffer();
118 self.def_levels_buffer = self.record_reader.consume_def_levels();
119 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
120 self.record_reader.reset();
121
122 let array: ArrayRef = match self.data_type {
123 ArrowType::Decimal128(p, s) => {
127 let array = buffer.into_array(null_buffer, ArrowType::Binary);
128 let binary = array.as_any().downcast_ref::<BinaryArray>().unwrap();
129 let decimal = Decimal128Array::from_unary(binary, |x| match x.len() {
132 0 => i128::default(),
133 _ => i128::from_be_bytes(sign_extend_be(x)),
134 })
135 .with_precision_and_scale(p, s)?;
136 Arc::new(decimal)
137 }
138 ArrowType::Decimal256(p, s) => {
139 let array = buffer.into_array(null_buffer, ArrowType::Binary);
140 let binary = array.as_any().downcast_ref::<BinaryArray>().unwrap();
141 let decimal = Decimal256Array::from_unary(binary, |x| match x.len() {
144 0 => i256::default(),
145 _ => i256::from_be_bytes(sign_extend_be(x)),
146 })
147 .with_precision_and_scale(p, s)?;
148 Arc::new(decimal)
149 }
150 _ => buffer.into_array(null_buffer, self.data_type.clone()),
151 };
152
153 Ok(array)
154 }
155
156 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
157 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
158 }
159
160 fn get_def_levels(&self) -> Option<&[i16]> {
161 self.def_levels_buffer.as_deref()
162 }
163
164 fn get_rep_levels(&self) -> Option<&[i16]> {
165 self.rep_levels_buffer.as_deref()
166 }
167}
168
169struct ByteArrayColumnValueDecoder<I: OffsetSizeTrait> {
171 dict: Option<OffsetBuffer<I>>,
172 decoder: Option<ByteArrayDecoder>,
173 validate_utf8: bool,
174}
175
176impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I> {
177 type Buffer = OffsetBuffer<I>;
178
179 fn new(desc: &ColumnDescPtr) -> Self {
180 let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
181 Self {
182 dict: None,
183 decoder: None,
184 validate_utf8,
185 }
186 }
187
188 fn set_dict(
189 &mut self,
190 buf: Bytes,
191 num_values: u32,
192 encoding: Encoding,
193 _is_sorted: bool,
194 ) -> Result<()> {
195 if !matches!(
196 encoding,
197 Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
198 ) {
199 return Err(nyi_err!(
200 "Invalid/Unsupported encoding type for dictionary: {}",
201 encoding
202 ));
203 }
204
205 let mut buffer = OffsetBuffer::default();
206 let mut decoder = ByteArrayDecoderPlain::new(
207 buf,
208 num_values as usize,
209 Some(num_values as usize),
210 self.validate_utf8,
211 );
212 decoder.read(&mut buffer, usize::MAX)?;
213 self.dict = Some(buffer);
214 Ok(())
215 }
216
217 fn set_data(
218 &mut self,
219 encoding: Encoding,
220 data: Bytes,
221 num_levels: usize,
222 num_values: Option<usize>,
223 ) -> Result<()> {
224 self.decoder = Some(ByteArrayDecoder::new(
225 encoding,
226 data,
227 num_levels,
228 num_values,
229 self.validate_utf8,
230 )?);
231 Ok(())
232 }
233
234 fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
235 let decoder = self
236 .decoder
237 .as_mut()
238 .ok_or_else(|| general_err!("no decoder set"))?;
239
240 decoder.read(out, num_values, self.dict.as_ref())
241 }
242
243 fn skip_values(&mut self, num_values: usize) -> Result<usize> {
244 let decoder = self
245 .decoder
246 .as_mut()
247 .ok_or_else(|| general_err!("no decoder set"))?;
248
249 decoder.skip(num_values, self.dict.as_ref())
250 }
251}
252
253pub enum ByteArrayDecoder {
255 Plain(ByteArrayDecoderPlain),
256 Dictionary(ByteArrayDecoderDictionary),
257 DeltaLength(ByteArrayDecoderDeltaLength),
258 DeltaByteArray(ByteArrayDecoderDelta),
259}
260
261impl ByteArrayDecoder {
262 pub fn new(
263 encoding: Encoding,
264 data: Bytes,
265 num_levels: usize,
266 num_values: Option<usize>,
267 validate_utf8: bool,
268 ) -> Result<Self> {
269 let decoder = match encoding {
270 Encoding::PLAIN => ByteArrayDecoder::Plain(ByteArrayDecoderPlain::new(
271 data,
272 num_levels,
273 num_values,
274 validate_utf8,
275 )),
276 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => ByteArrayDecoder::Dictionary(
277 ByteArrayDecoderDictionary::new(data, num_levels, num_values),
278 ),
279 Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength(
280 ByteArrayDecoderDeltaLength::new(data, validate_utf8)?,
281 ),
282 Encoding::DELTA_BYTE_ARRAY => {
283 ByteArrayDecoder::DeltaByteArray(ByteArrayDecoderDelta::new(data, validate_utf8)?)
284 }
285 _ => {
286 return Err(general_err!(
287 "unsupported encoding for byte array: {}",
288 encoding
289 ))
290 }
291 };
292
293 Ok(decoder)
294 }
295
296 pub fn read<I: OffsetSizeTrait>(
298 &mut self,
299 out: &mut OffsetBuffer<I>,
300 len: usize,
301 dict: Option<&OffsetBuffer<I>>,
302 ) -> Result<usize> {
303 match self {
304 ByteArrayDecoder::Plain(d) => d.read(out, len),
305 ByteArrayDecoder::Dictionary(d) => {
306 let dict =
307 dict.ok_or_else(|| general_err!("missing dictionary page for column"))?;
308
309 d.read(out, dict, len)
310 }
311 ByteArrayDecoder::DeltaLength(d) => d.read(out, len),
312 ByteArrayDecoder::DeltaByteArray(d) => d.read(out, len),
313 }
314 }
315
316 pub fn skip<I: OffsetSizeTrait>(
318 &mut self,
319 len: usize,
320 dict: Option<&OffsetBuffer<I>>,
321 ) -> Result<usize> {
322 match self {
323 ByteArrayDecoder::Plain(d) => d.skip(len),
324 ByteArrayDecoder::Dictionary(d) => {
325 let dict =
326 dict.ok_or_else(|| general_err!("missing dictionary page for column"))?;
327
328 d.skip(dict, len)
329 }
330 ByteArrayDecoder::DeltaLength(d) => d.skip(len),
331 ByteArrayDecoder::DeltaByteArray(d) => d.skip(len),
332 }
333 }
334}
335
336pub struct ByteArrayDecoderPlain {
338 buf: Bytes,
339 offset: usize,
340 validate_utf8: bool,
341
342 max_remaining_values: usize,
345}
346
347impl ByteArrayDecoderPlain {
348 pub fn new(
349 buf: Bytes,
350 num_levels: usize,
351 num_values: Option<usize>,
352 validate_utf8: bool,
353 ) -> Self {
354 Self {
355 buf,
356 validate_utf8,
357 offset: 0,
358 max_remaining_values: num_values.unwrap_or(num_levels),
359 }
360 }
361
362 pub fn read<I: OffsetSizeTrait>(
363 &mut self,
364 output: &mut OffsetBuffer<I>,
365 len: usize,
366 ) -> Result<usize> {
367 let initial_values_length = output.values.len();
368
369 let to_read = len.min(self.max_remaining_values);
370 output.offsets.reserve(to_read);
371
372 let remaining_bytes = self.buf.len() - self.offset;
373 if remaining_bytes == 0 {
374 return Ok(0);
375 }
376
377 let estimated_bytes = remaining_bytes
378 .checked_mul(to_read)
379 .map(|x| x / self.max_remaining_values)
380 .unwrap_or_default();
381
382 output.values.reserve(estimated_bytes);
383
384 let mut read = 0;
385
386 let buf = self.buf.as_ref();
387 while self.offset < self.buf.len() && read != to_read {
388 if self.offset + 4 > buf.len() {
389 return Err(ParquetError::EOF("eof decoding byte array".into()));
390 }
391 let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
392 let len = u32::from_le_bytes(len_bytes);
393
394 let start_offset = self.offset + 4;
395 let end_offset = start_offset + len as usize;
396 if end_offset > buf.len() {
397 return Err(ParquetError::EOF("eof decoding byte array".into()));
398 }
399
400 output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?;
401
402 self.offset = end_offset;
403 read += 1;
404 }
405 self.max_remaining_values -= to_read;
406
407 if self.validate_utf8 {
408 output.check_valid_utf8(initial_values_length)?;
409 }
410 Ok(to_read)
411 }
412
413 pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
414 let to_skip = to_skip.min(self.max_remaining_values);
415 let mut skip = 0;
416 let buf = self.buf.as_ref();
417
418 while self.offset < self.buf.len() && skip != to_skip {
419 if self.offset + 4 > buf.len() {
420 return Err(ParquetError::EOF("eof decoding byte array".into()));
421 }
422 let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
423 let len = u32::from_le_bytes(len_bytes) as usize;
424 skip += 1;
425 self.offset = self.offset + 4 + len;
426 }
427 self.max_remaining_values -= skip;
428 Ok(skip)
429 }
430}
431
432pub struct ByteArrayDecoderDeltaLength {
434 lengths: Vec<i32>,
435 data: Bytes,
436 length_offset: usize,
437 data_offset: usize,
438 validate_utf8: bool,
439}
440
441impl ByteArrayDecoderDeltaLength {
442 fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
443 let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
444 len_decoder.set_data(data.clone(), 0)?;
445 let values = len_decoder.values_left();
446
447 let mut lengths = vec![0; values];
448 len_decoder.get(&mut lengths)?;
449
450 let mut total_bytes = 0;
451
452 for l in lengths.iter() {
453 if *l < 0 {
454 return Err(ParquetError::General(
455 "negative delta length byte array length".to_string(),
456 ));
457 }
458 total_bytes += *l as usize;
459 }
460
461 if total_bytes + len_decoder.get_offset() > data.len() {
462 return Err(ParquetError::General(
463 "Insufficient delta length byte array bytes".to_string(),
464 ));
465 }
466
467 Ok(Self {
468 lengths,
469 data,
470 validate_utf8,
471 length_offset: 0,
472 data_offset: len_decoder.get_offset(),
473 })
474 }
475
476 fn read<I: OffsetSizeTrait>(
477 &mut self,
478 output: &mut OffsetBuffer<I>,
479 len: usize,
480 ) -> Result<usize> {
481 let initial_values_length = output.values.len();
482
483 let to_read = len.min(self.lengths.len() - self.length_offset);
484 output.offsets.reserve(to_read);
485
486 let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];
487
488 let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
489 output.values.reserve(total_bytes);
490
491 let mut current_offset = self.data_offset;
492 for length in src_lengths {
493 let end_offset = current_offset + *length as usize;
494 output.try_push(
495 &self.data.as_ref()[current_offset..end_offset],
496 self.validate_utf8,
497 )?;
498 current_offset = end_offset;
499 }
500
501 self.data_offset = current_offset;
502 self.length_offset += to_read;
503
504 if self.validate_utf8 {
505 output.check_valid_utf8(initial_values_length)?;
506 }
507 Ok(to_read)
508 }
509
510 fn skip(&mut self, to_skip: usize) -> Result<usize> {
511 let remain_values = self.lengths.len() - self.length_offset;
512 let to_skip = remain_values.min(to_skip);
513
514 let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip];
515 let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
516
517 self.data_offset += total_bytes;
518 self.length_offset += to_skip;
519 Ok(to_skip)
520 }
521}
522
523pub struct ByteArrayDecoderDelta {
525 decoder: DeltaByteArrayDecoder,
526 validate_utf8: bool,
527}
528
529impl ByteArrayDecoderDelta {
530 fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
531 Ok(Self {
532 decoder: DeltaByteArrayDecoder::new(data)?,
533 validate_utf8,
534 })
535 }
536
537 fn read<I: OffsetSizeTrait>(
538 &mut self,
539 output: &mut OffsetBuffer<I>,
540 len: usize,
541 ) -> Result<usize> {
542 let initial_values_length = output.values.len();
543 output.offsets.reserve(len.min(self.decoder.remaining()));
544
545 let read = self
546 .decoder
547 .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?;
548
549 if self.validate_utf8 {
550 output.check_valid_utf8(initial_values_length)?;
551 }
552 Ok(read)
553 }
554
555 fn skip(&mut self, to_skip: usize) -> Result<usize> {
556 self.decoder.skip(to_skip)
557 }
558}
559
560pub struct ByteArrayDecoderDictionary {
562 decoder: DictIndexDecoder,
563}
564
565impl ByteArrayDecoderDictionary {
566 fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
567 Self {
568 decoder: DictIndexDecoder::new(data, num_levels, num_values),
569 }
570 }
571
572 fn read<I: OffsetSizeTrait>(
573 &mut self,
574 output: &mut OffsetBuffer<I>,
575 dict: &OffsetBuffer<I>,
576 len: usize,
577 ) -> Result<usize> {
578 if dict.is_empty() {
580 return Ok(0);
581 }
582
583 self.decoder.read(len, |keys| {
584 output.extend_from_dictionary(keys, dict.offsets.as_slice(), dict.values.as_slice())
585 })
586 }
587
588 fn skip<I: OffsetSizeTrait>(
589 &mut self,
590 dict: &OffsetBuffer<I>,
591 to_skip: usize,
592 ) -> Result<usize> {
593 if dict.is_empty() {
595 return Ok(0);
596 }
597
598 self.decoder.skip(to_skip)
599 }
600}
601
602#[cfg(test)]
603mod tests {
604 use super::*;
605 use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column};
606 use crate::arrow::record_reader::buffer::ValuesBuffer;
607 use arrow_array::{Array, StringArray};
608 use arrow_buffer::Buffer;
609
610 #[test]
611 fn test_byte_array_decoder() {
612 let (pages, encoded_dictionary) =
613 byte_array_all_encodings(vec!["hello", "world", "a", "b"]);
614
615 let column_desc = utf8_column();
616 let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
617
618 decoder
619 .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
620 .unwrap();
621
622 for (encoding, page) in pages {
623 let mut output = OffsetBuffer::<i32>::default();
624 decoder.set_data(encoding, page, 4, Some(4)).unwrap();
625
626 assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
627
628 assert_eq!(output.values.as_slice(), "hello".as_bytes());
629 assert_eq!(output.offsets.as_slice(), &[0, 5]);
630
631 assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
632 assert_eq!(output.values.as_slice(), "helloworld".as_bytes());
633 assert_eq!(output.offsets.as_slice(), &[0, 5, 10]);
634
635 assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
636 assert_eq!(output.values.as_slice(), "helloworldab".as_bytes());
637 assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 11, 12]);
638
639 assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
640
641 let valid = [false, false, true, true, false, true, true, false, false];
642 let valid_buffer = Buffer::from_iter(valid.iter().cloned());
643
644 output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
645 let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
646 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
647
648 assert_eq!(
649 strings.iter().collect::<Vec<_>>(),
650 vec![
651 None,
652 None,
653 Some("hello"),
654 Some("world"),
655 None,
656 Some("a"),
657 Some("b"),
658 None,
659 None,
660 ]
661 );
662 }
663 }
664
665 #[test]
666 fn test_byte_array_decoder_skip() {
667 let (pages, encoded_dictionary) =
668 byte_array_all_encodings(vec!["hello", "world", "a", "b"]);
669
670 let column_desc = utf8_column();
671 let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
672
673 decoder
674 .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
675 .unwrap();
676
677 for (encoding, page) in pages {
678 let mut output = OffsetBuffer::<i32>::default();
679 decoder.set_data(encoding, page, 4, Some(4)).unwrap();
680
681 assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
682
683 assert_eq!(output.values.as_slice(), "hello".as_bytes());
684 assert_eq!(output.offsets.as_slice(), &[0, 5]);
685
686 assert_eq!(decoder.skip_values(1).unwrap(), 1);
687 assert_eq!(decoder.skip_values(1).unwrap(), 1);
688
689 assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
690 assert_eq!(output.values.as_slice(), "hellob".as_bytes());
691 assert_eq!(output.offsets.as_slice(), &[0, 5, 6]);
692
693 assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
694
695 let valid = [false, false, true, true, false, false];
696 let valid_buffer = Buffer::from_iter(valid.iter().cloned());
697
698 output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice());
699 let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
700 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
701
702 assert_eq!(
703 strings.iter().collect::<Vec<_>>(),
704 vec![None, None, Some("hello"), Some("b"), None, None,]
705 );
706 }
707 }
708
709 #[test]
710 fn test_byte_array_decoder_nulls() {
711 let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new());
712
713 let column_desc = utf8_column();
714 let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
715
716 decoder
717 .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
718 .unwrap();
719
720 for (encoding, page) in pages.clone() {
722 let mut output = OffsetBuffer::<i32>::default();
723 decoder.set_data(encoding, page, 4, None).unwrap();
724 assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
725 }
726
727 for (encoding, page) in pages {
729 decoder.set_data(encoding, page, 4, None).unwrap();
730 assert_eq!(decoder.skip_values(1024).unwrap(), 0);
731 }
732 }
733}