1use crate::arrow::array_reader::{ArrayReader, read_records, skip_records};
19use crate::arrow::buffer::view_buffer::ViewBuffer;
20use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
21use crate::arrow::record_reader::GenericRecordReader;
22use crate::arrow::schema::parquet_to_arrow_field;
23use crate::basic::{ConvertedType, Encoding};
24use crate::column::page::PageIterator;
25use crate::column::reader::decoder::ColumnValueDecoder;
26use crate::data_type::Int32Type;
27use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
28use crate::errors::{ParquetError, Result};
29use crate::schema::types::ColumnDescPtr;
30use crate::util::utf8::check_valid_utf8;
31use arrow_array::{ArrayRef, builder::make_view};
32use arrow_buffer::Buffer;
33use arrow_data::ByteView;
34use arrow_schema::DataType as ArrowType;
35use bytes::Bytes;
36use std::any::Any;
37
38pub fn make_byte_view_array_reader(
40 pages: Box<dyn PageIterator>,
41 column_desc: ColumnDescPtr,
42 arrow_type: Option<ArrowType>,
43) -> Result<Box<dyn ArrayReader>> {
44 let data_type = match arrow_type {
46 Some(t) => t,
47 None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() {
48 ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View,
49 _ => ArrowType::BinaryView,
50 },
51 };
52
53 match data_type {
54 ArrowType::BinaryView | ArrowType::Utf8View => {
55 let reader = GenericRecordReader::new(column_desc);
56 Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader)))
57 }
58
59 _ => Err(general_err!(
60 "invalid data type for byte array reader read to view type - {}",
61 data_type
62 )),
63 }
64}
65
66struct ByteViewArrayReader {
68 data_type: ArrowType,
69 pages: Box<dyn PageIterator>,
70 def_levels_buffer: Option<Vec<i16>>,
71 rep_levels_buffer: Option<Vec<i16>>,
72 record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>,
73}
74
75impl ByteViewArrayReader {
76 fn new(
77 pages: Box<dyn PageIterator>,
78 data_type: ArrowType,
79 record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>,
80 ) -> Self {
81 Self {
82 data_type,
83 pages,
84 def_levels_buffer: None,
85 rep_levels_buffer: None,
86 record_reader,
87 }
88 }
89}
90
91impl ArrayReader for ByteViewArrayReader {
92 fn as_any(&self) -> &dyn Any {
93 self
94 }
95
96 fn get_data_type(&self) -> &ArrowType {
97 &self.data_type
98 }
99
100 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
101 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
102 }
103
104 fn consume_batch(&mut self) -> Result<ArrayRef> {
105 let buffer = self.record_reader.consume_record_data();
106 let null_buffer = self.record_reader.consume_bitmap_buffer();
107 self.def_levels_buffer = self.record_reader.consume_def_levels();
108 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
109 self.record_reader.reset();
110
111 let array = buffer.into_array(null_buffer, &self.data_type);
112
113 Ok(array)
114 }
115
116 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
117 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
118 }
119
120 fn get_def_levels(&self) -> Option<&[i16]> {
121 self.def_levels_buffer.as_deref()
122 }
123
124 fn get_rep_levels(&self) -> Option<&[i16]> {
125 self.rep_levels_buffer.as_deref()
126 }
127}
128
129struct ByteViewArrayColumnValueDecoder {
131 dict: Option<ViewBuffer>,
132 decoder: Option<ByteViewArrayDecoder>,
133 validate_utf8: bool,
134}
135
136impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder {
137 type Buffer = ViewBuffer;
138
139 fn new(desc: &ColumnDescPtr) -> Self {
140 let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
141 Self {
142 dict: None,
143 decoder: None,
144 validate_utf8,
145 }
146 }
147
148 fn set_dict(
149 &mut self,
150 buf: Bytes,
151 num_values: u32,
152 encoding: Encoding,
153 _is_sorted: bool,
154 ) -> Result<()> {
155 if !matches!(
156 encoding,
157 Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
158 ) {
159 return Err(nyi_err!(
160 "Invalid/Unsupported encoding type for dictionary: {}",
161 encoding
162 ));
163 }
164
165 let mut buffer = ViewBuffer::default();
166 let mut decoder = ByteViewArrayDecoderPlain::new(
167 buf,
168 num_values as usize,
169 Some(num_values as usize),
170 self.validate_utf8,
171 );
172 decoder.read(&mut buffer, usize::MAX)?;
173 self.dict = Some(buffer);
174 Ok(())
175 }
176
177 fn set_data(
178 &mut self,
179 encoding: Encoding,
180 data: Bytes,
181 num_levels: usize,
182 num_values: Option<usize>,
183 ) -> Result<()> {
184 self.decoder = Some(ByteViewArrayDecoder::new(
185 encoding,
186 data,
187 num_levels,
188 num_values,
189 self.validate_utf8,
190 )?);
191 Ok(())
192 }
193
194 fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
195 let decoder = self
196 .decoder
197 .as_mut()
198 .ok_or_else(|| general_err!("no decoder set"))?;
199
200 decoder.read(out, num_values, self.dict.as_ref())
201 }
202
203 fn skip_values(&mut self, num_values: usize) -> Result<usize> {
204 let decoder = self
205 .decoder
206 .as_mut()
207 .ok_or_else(|| general_err!("no decoder set"))?;
208
209 decoder.skip(num_values, self.dict.as_ref())
210 }
211}
212
213pub enum ByteViewArrayDecoder {
215 Plain(ByteViewArrayDecoderPlain),
216 Dictionary(ByteViewArrayDecoderDictionary),
217 DeltaLength(ByteViewArrayDecoderDeltaLength),
218 DeltaByteArray(ByteViewArrayDecoderDelta),
219}
220
221impl ByteViewArrayDecoder {
222 pub fn new(
223 encoding: Encoding,
224 data: Bytes,
225 num_levels: usize,
226 num_values: Option<usize>,
227 validate_utf8: bool,
228 ) -> Result<Self> {
229 let decoder = match encoding {
230 Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new(
231 data,
232 num_levels,
233 num_values,
234 validate_utf8,
235 )),
236 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
237 ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new(
238 data, num_levels, num_values,
239 )?)
240 }
241 Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength(
242 ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?,
243 ),
244 Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray(
245 ByteViewArrayDecoderDelta::new(data, validate_utf8)?,
246 ),
247 _ => {
248 return Err(general_err!(
249 "unsupported encoding for byte array: {}",
250 encoding
251 ));
252 }
253 };
254
255 Ok(decoder)
256 }
257
258 pub fn read(
260 &mut self,
261 out: &mut ViewBuffer,
262 len: usize,
263 dict: Option<&ViewBuffer>,
264 ) -> Result<usize> {
265 match self {
266 ByteViewArrayDecoder::Plain(d) => d.read(out, len),
267 ByteViewArrayDecoder::Dictionary(d) => {
268 let dict = dict
269 .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
270 d.read(out, dict, len)
271 }
272 ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len),
273 ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len),
274 }
275 }
276
277 pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result<usize> {
279 match self {
280 ByteViewArrayDecoder::Plain(d) => d.skip(len),
281 ByteViewArrayDecoder::Dictionary(d) => {
282 let dict = dict
283 .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
284 d.skip(dict, len)
285 }
286 ByteViewArrayDecoder::DeltaLength(d) => d.skip(len),
287 ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len),
288 }
289 }
290}
291
292pub struct ByteViewArrayDecoderPlain {
294 buf: Buffer,
295 offset: usize,
296
297 validate_utf8: bool,
298
299 max_remaining_values: usize,
302}
303
304impl ByteViewArrayDecoderPlain {
305 pub fn new(
306 buf: Bytes,
307 num_levels: usize,
308 num_values: Option<usize>,
309 validate_utf8: bool,
310 ) -> Self {
311 Self {
312 buf: Buffer::from(buf),
313 offset: 0,
314 max_remaining_values: num_values.unwrap_or(num_levels),
315 validate_utf8,
316 }
317 }
318
319 pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
320 if self.validate_utf8 {
321 self.read_impl::<true>(output, len)
322 } else {
323 self.read_impl::<false>(output, len)
324 }
325 }
326
327 fn read_impl<const VALIDATE_UTF8: bool>(
328 &mut self,
329 output: &mut ViewBuffer,
330 len: usize,
331 ) -> Result<usize> {
332 let block_id = {
335 if output.buffers.last().is_some_and(|x| x.ptr_eq(&self.buf)) {
336 output.buffers.len() as u32 - 1
337 } else {
338 output.append_block(self.buf.clone())
339 }
340 };
341
342 let to_read = len.min(self.max_remaining_values);
343
344 let buf: &[u8] = self.buf.as_ref();
345 let buf_len = buf.len();
346 let mut end_offset = self.offset;
347 let mut utf8_validation_begin = end_offset;
348
349 output.views.reserve(to_read);
350
351 let views_ptr = output.views.as_mut_ptr().wrapping_add(output.views.len());
355 for i in 0..to_read {
356 let start_offset = end_offset + 4;
357
358 if start_offset > buf_len {
359 return Err(ParquetError::EOF("eof decoding byte array".into()));
360 }
361
362 let len = u32::from_le_bytes(
364 unsafe { buf.get_unchecked(end_offset..start_offset) }
365 .try_into()
366 .unwrap(),
367 );
368
369 end_offset = start_offset + len as usize;
370
371 if end_offset > buf_len {
372 return Err(ParquetError::EOF("eof decoding byte array".into()));
373 }
374
375 if VALIDATE_UTF8 {
376 if len >= 128 {
398 check_valid_utf8(unsafe {
400 buf.get_unchecked(utf8_validation_begin..start_offset - 4)
401 })?;
402 utf8_validation_begin = start_offset;
404 }
405 }
406
407 let view = make_view(
408 unsafe { buf.get_unchecked(start_offset..end_offset) },
409 block_id,
410 start_offset as u32,
411 );
412 unsafe {
414 views_ptr.add(i).write(view);
415 }
416 }
417
418 unsafe {
420 output.views.set_len(output.views.len() + to_read);
421 }
422 if VALIDATE_UTF8 {
423 check_valid_utf8(unsafe { buf.get_unchecked(utf8_validation_begin..end_offset) })?;
426 }
427
428 self.offset = end_offset;
429 self.max_remaining_values -= to_read;
430
431 Ok(to_read)
432 }
433
434 pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
435 let to_skip = to_skip.min(self.max_remaining_values);
436 let mut skip = 0;
437 let buf: &[u8] = self.buf.as_ref();
438
439 while self.offset < self.buf.len() && skip != to_skip {
440 if self.offset + 4 > buf.len() {
441 return Err(ParquetError::EOF("eof decoding byte array".into()));
442 }
443 let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
444 let len = u32::from_le_bytes(len_bytes) as usize;
445 skip += 1;
446 self.offset = self.offset + 4 + len;
447 }
448 self.max_remaining_values -= skip;
449 Ok(skip)
450 }
451}
452
453pub struct ByteViewArrayDecoderDictionary {
454 decoder: DictIndexDecoder,
455}
456
457impl ByteViewArrayDecoderDictionary {
458 fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Result<Self> {
459 Ok(Self {
460 decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
461 })
462 }
463
464 fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result<usize> {
474 if dict.is_empty() || len == 0 {
475 return Ok(0);
476 }
477
478 let need_to_create_new_buffer = {
481 if output.buffers.len() >= dict.buffers.len() {
482 let offset = output.buffers.len() - dict.buffers.len();
483 output.buffers[offset..]
484 .iter()
485 .zip(dict.buffers.iter())
486 .any(|(a, b)| !a.ptr_eq(b))
487 } else {
488 true
489 }
490 };
491
492 if need_to_create_new_buffer {
493 for b in dict.buffers.iter() {
494 output.buffers.push(b.clone());
495 }
496 }
497
498 let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32;
502
503 let mut error = None;
504 let read = self.decoder.read(len, |keys| {
505 if base_buffer_idx == 0 {
506 output
508 .views
509 .extend(keys.iter().map(|k| match dict.views.get(*k as usize) {
510 Some(&view) => view,
511 None => {
512 if error.is_none() {
513 error = Some(general_err!("invalid key={} for dictionary", *k));
514 }
515 0
516 }
517 }));
518 Ok(())
519 } else {
520 output
521 .views
522 .extend(keys.iter().map(|k| match dict.views.get(*k as usize) {
523 Some(&view) => {
524 let len = view as u32;
525 if len <= 12 {
526 view
527 } else {
528 let mut view = ByteView::from(view);
529 view.buffer_index += base_buffer_idx;
530 view.into()
531 }
532 }
533 None => {
534 if error.is_none() {
535 error = Some(general_err!("invalid key={} for dictionary", *k));
536 }
537 0
538 }
539 }));
540 Ok(())
541 }
542 })?;
543 if let Some(e) = error {
544 return Err(e);
545 }
546 Ok(read)
547 }
548
549 fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result<usize> {
550 if dict.is_empty() {
551 return Ok(0);
552 }
553 self.decoder.skip(to_skip)
554 }
555}
556
557pub struct ByteViewArrayDecoderDeltaLength {
559 lengths: Vec<i32>,
560 data: Bytes,
561 length_offset: usize,
562 data_offset: usize,
563 validate_utf8: bool,
564}
565
566impl ByteViewArrayDecoderDeltaLength {
567 fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
568 let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
569 len_decoder.set_data(data.clone(), 0)?;
570 let values = len_decoder.values_left();
571
572 let mut lengths = vec![0; values];
573 len_decoder.get(&mut lengths)?;
574
575 let mut total_bytes = 0;
576
577 for l in lengths.iter() {
578 if *l < 0 {
579 return Err(ParquetError::General(
580 "negative delta length byte array length".to_string(),
581 ));
582 }
583 total_bytes += *l as usize;
584 }
585
586 if total_bytes + len_decoder.get_offset() > data.len() {
587 return Err(ParquetError::General(
588 "Insufficient delta length byte array bytes".to_string(),
589 ));
590 }
591
592 Ok(Self {
593 lengths,
594 data,
595 validate_utf8,
596 length_offset: 0,
597 data_offset: len_decoder.get_offset(),
598 })
599 }
600
601 fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
602 let to_read = len.min(self.lengths.len() - self.length_offset);
603 output.views.reserve(to_read);
604
605 let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];
606
607 let bytes = Buffer::from(self.data.clone());
609 let block_id = output.append_block(bytes);
610
611 let mut current_offset = self.data_offset;
612 let initial_offset = current_offset;
613
614 output.views.extend(src_lengths.iter().map(|length| {
615 let len = *length as u32;
616 let start_offset = current_offset;
617 current_offset += len as usize;
618 make_view(
621 &self.data[start_offset..start_offset + len as usize],
622 block_id,
623 start_offset as u32,
624 )
625 }));
626
627 if self.validate_utf8 {
629 check_valid_utf8(&self.data[initial_offset..current_offset])?;
630 }
631
632 self.data_offset = current_offset;
633 self.length_offset += to_read;
634
635 Ok(to_read)
636 }
637
638 fn skip(&mut self, to_skip: usize) -> Result<usize> {
639 let remain_values = self.lengths.len() - self.length_offset;
640 let to_skip = remain_values.min(to_skip);
641
642 let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip];
643 let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
644
645 self.data_offset += total_bytes;
646 self.length_offset += to_skip;
647 Ok(to_skip)
648 }
649}
650
651pub struct ByteViewArrayDecoderDelta {
653 decoder: DeltaByteArrayDecoder,
654 validate_utf8: bool,
655}
656
657impl ByteViewArrayDecoderDelta {
658 fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
659 Ok(Self {
660 decoder: DeltaByteArrayDecoder::new(data)?,
661 validate_utf8,
662 })
663 }
664
665 fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
674 output.views.reserve(len.min(self.decoder.remaining()));
675
676 let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);
678
679 let buffer_id = output.buffers.len() as u32;
680
681 let read = if !self.validate_utf8 {
682 self.decoder.read(len, |bytes| {
683 let offset = array_buffer.len();
684 let view = make_view(bytes, buffer_id, offset as u32);
685 if bytes.len() > 12 {
686 array_buffer.extend_from_slice(bytes);
688 }
689
690 unsafe {
694 output.append_raw_view_unchecked(view);
695 }
696 Ok(())
697 })?
698 } else {
699 let mut utf8_validation_buffer = Vec::with_capacity(4096);
703
704 let v = self.decoder.read(len, |bytes| {
705 let offset = array_buffer.len();
706 let view = make_view(bytes, buffer_id, offset as u32);
707 if bytes.len() > 12 {
708 array_buffer.extend_from_slice(bytes);
710 } else {
711 utf8_validation_buffer.extend_from_slice(bytes);
712 }
713
714 unsafe {
719 output.append_raw_view_unchecked(view);
720 }
721 Ok(())
722 })?;
723 check_valid_utf8(&array_buffer)?;
724 check_valid_utf8(&utf8_validation_buffer)?;
725 v
726 };
727
728 let actual_block_id = output.append_block(Buffer::from_vec(array_buffer));
729 assert_eq!(actual_block_id, buffer_id);
730 Ok(read)
731 }
732
733 fn skip(&mut self, to_skip: usize) -> Result<usize> {
734 self.decoder.skip(to_skip)
735 }
736}
737
738#[cfg(test)]
739mod tests {
740 use arrow_array::StringViewArray;
741 use arrow_buffer::Buffer;
742
743 use crate::{
744 arrow::{
745 array_reader::test_util::{byte_array_all_encodings, encode_byte_array, utf8_column},
746 buffer::view_buffer::ViewBuffer,
747 record_reader::buffer::ValuesBuffer,
748 },
749 basic::Encoding,
750 column::reader::decoder::ColumnValueDecoder,
751 data_type::ByteArray,
752 };
753
754 use super::*;
755
756 #[test]
757 fn test_byte_array_string_view_decoder() {
758 let (pages, encoded_dictionary) =
759 byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]);
760
761 let column_desc = utf8_column();
762 let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
763
764 decoder
765 .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
766 .unwrap();
767
768 for (encoding, page) in pages {
769 let mut output = ViewBuffer::default();
770 decoder.set_data(encoding, page, 4, Some(4)).unwrap();
771
772 assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
773 assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
774 assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
775 assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
776
777 assert_eq!(output.views.len(), 4);
778
779 let valid = [false, false, true, true, false, true, true, false, false];
780 let valid_buffer = Buffer::from_iter(valid.iter().cloned());
781
782 output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
783 let array = output.into_array(Some(valid_buffer), &ArrowType::Utf8View);
784 let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();
785
786 assert_eq!(
787 strings.iter().collect::<Vec<_>>(),
788 vec![
789 None,
790 None,
791 Some("hello"),
792 Some("world"),
793 None,
794 Some("large payload over 12 bytes"),
795 Some("b"),
796 None,
797 None,
798 ]
799 );
800 }
801 }
802
803 #[test]
804 fn test_byte_view_array_plain_decoder_reuse_buffer() {
805 let byte_array = vec!["hello", "world", "large payload over 12 bytes", "b"];
806 let byte_array: Vec<ByteArray> = byte_array.into_iter().map(|x| x.into()).collect();
807 let pages = encode_byte_array(Encoding::PLAIN, &byte_array);
808
809 let column_desc = utf8_column();
810 let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
811
812 let mut view_buffer = ViewBuffer::default();
813 decoder.set_data(Encoding::PLAIN, pages, 4, None).unwrap();
814 decoder.read(&mut view_buffer, 1).unwrap();
815 decoder.read(&mut view_buffer, 1).unwrap();
816 assert_eq!(view_buffer.buffers.len(), 1);
817
818 decoder.read(&mut view_buffer, 1).unwrap();
819 assert_eq!(view_buffer.buffers.len(), 1);
820 }
821}