1use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::buffer::view_buffer::ViewBuffer;
20use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
21use crate::arrow::record_reader::GenericRecordReader;
22use crate::arrow::schema::parquet_to_arrow_field;
23use crate::basic::{ConvertedType, Encoding};
24use crate::column::page::PageIterator;
25use crate::column::reader::decoder::ColumnValueDecoder;
26use crate::data_type::Int32Type;
27use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
28use crate::errors::{ParquetError, Result};
29use crate::schema::types::ColumnDescPtr;
30use crate::util::utf8::check_valid_utf8;
31use arrow_array::{builder::make_view, ArrayRef};
32use arrow_buffer::Buffer;
33use arrow_data::ByteView;
34use arrow_schema::DataType as ArrowType;
35use bytes::Bytes;
36use std::any::Any;
37
38pub fn make_byte_view_array_reader(
40 pages: Box<dyn PageIterator>,
41 column_desc: ColumnDescPtr,
42 arrow_type: Option<ArrowType>,
43) -> Result<Box<dyn ArrayReader>> {
44 let data_type = match arrow_type {
46 Some(t) => t,
47 None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() {
48 ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View,
49 _ => ArrowType::BinaryView,
50 },
51 };
52
53 match data_type {
54 ArrowType::BinaryView | ArrowType::Utf8View => {
55 let reader = GenericRecordReader::new(column_desc);
56 Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader)))
57 }
58
59 _ => Err(general_err!(
60 "invalid data type for byte array reader read to view type - {}",
61 data_type
62 )),
63 }
64}
65
66struct ByteViewArrayReader {
68 data_type: ArrowType,
69 pages: Box<dyn PageIterator>,
70 def_levels_buffer: Option<Vec<i16>>,
71 rep_levels_buffer: Option<Vec<i16>>,
72 record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>,
73}
74
75impl ByteViewArrayReader {
76 fn new(
77 pages: Box<dyn PageIterator>,
78 data_type: ArrowType,
79 record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>,
80 ) -> Self {
81 Self {
82 data_type,
83 pages,
84 def_levels_buffer: None,
85 rep_levels_buffer: None,
86 record_reader,
87 }
88 }
89}
90
91impl ArrayReader for ByteViewArrayReader {
92 fn as_any(&self) -> &dyn Any {
93 self
94 }
95
96 fn get_data_type(&self) -> &ArrowType {
97 &self.data_type
98 }
99
100 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
101 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
102 }
103
104 fn consume_batch(&mut self) -> Result<ArrayRef> {
105 let buffer = self.record_reader.consume_record_data();
106 let null_buffer = self.record_reader.consume_bitmap_buffer();
107 self.def_levels_buffer = self.record_reader.consume_def_levels();
108 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
109 self.record_reader.reset();
110
111 let array = buffer.into_array(null_buffer, &self.data_type);
112
113 Ok(array)
114 }
115
116 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
117 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
118 }
119
120 fn get_def_levels(&self) -> Option<&[i16]> {
121 self.def_levels_buffer.as_deref()
122 }
123
124 fn get_rep_levels(&self) -> Option<&[i16]> {
125 self.rep_levels_buffer.as_deref()
126 }
127}
128
129struct ByteViewArrayColumnValueDecoder {
131 dict: Option<ViewBuffer>,
132 decoder: Option<ByteViewArrayDecoder>,
133 validate_utf8: bool,
134}
135
136impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder {
137 type Buffer = ViewBuffer;
138
139 fn new(desc: &ColumnDescPtr) -> Self {
140 let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
141 Self {
142 dict: None,
143 decoder: None,
144 validate_utf8,
145 }
146 }
147
148 fn set_dict(
149 &mut self,
150 buf: Bytes,
151 num_values: u32,
152 encoding: Encoding,
153 _is_sorted: bool,
154 ) -> Result<()> {
155 if !matches!(
156 encoding,
157 Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
158 ) {
159 return Err(nyi_err!(
160 "Invalid/Unsupported encoding type for dictionary: {}",
161 encoding
162 ));
163 }
164
165 let mut buffer = ViewBuffer::default();
166 let mut decoder = ByteViewArrayDecoderPlain::new(
167 buf,
168 num_values as usize,
169 Some(num_values as usize),
170 self.validate_utf8,
171 );
172 decoder.read(&mut buffer, usize::MAX)?;
173 self.dict = Some(buffer);
174 Ok(())
175 }
176
177 fn set_data(
178 &mut self,
179 encoding: Encoding,
180 data: Bytes,
181 num_levels: usize,
182 num_values: Option<usize>,
183 ) -> Result<()> {
184 self.decoder = Some(ByteViewArrayDecoder::new(
185 encoding,
186 data,
187 num_levels,
188 num_values,
189 self.validate_utf8,
190 )?);
191 Ok(())
192 }
193
194 fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
195 let decoder = self
196 .decoder
197 .as_mut()
198 .ok_or_else(|| general_err!("no decoder set"))?;
199
200 decoder.read(out, num_values, self.dict.as_ref())
201 }
202
203 fn skip_values(&mut self, num_values: usize) -> Result<usize> {
204 let decoder = self
205 .decoder
206 .as_mut()
207 .ok_or_else(|| general_err!("no decoder set"))?;
208
209 decoder.skip(num_values, self.dict.as_ref())
210 }
211}
212
213pub enum ByteViewArrayDecoder {
215 Plain(ByteViewArrayDecoderPlain),
216 Dictionary(ByteViewArrayDecoderDictionary),
217 DeltaLength(ByteViewArrayDecoderDeltaLength),
218 DeltaByteArray(ByteViewArrayDecoderDelta),
219}
220
221impl ByteViewArrayDecoder {
222 pub fn new(
223 encoding: Encoding,
224 data: Bytes,
225 num_levels: usize,
226 num_values: Option<usize>,
227 validate_utf8: bool,
228 ) -> Result<Self> {
229 let decoder = match encoding {
230 Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new(
231 data,
232 num_levels,
233 num_values,
234 validate_utf8,
235 )),
236 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
237 ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new(
238 data, num_levels, num_values,
239 ))
240 }
241 Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength(
242 ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?,
243 ),
244 Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray(
245 ByteViewArrayDecoderDelta::new(data, validate_utf8)?,
246 ),
247 _ => {
248 return Err(general_err!(
249 "unsupported encoding for byte array: {}",
250 encoding
251 ))
252 }
253 };
254
255 Ok(decoder)
256 }
257
258 pub fn read(
260 &mut self,
261 out: &mut ViewBuffer,
262 len: usize,
263 dict: Option<&ViewBuffer>,
264 ) -> Result<usize> {
265 match self {
266 ByteViewArrayDecoder::Plain(d) => d.read(out, len),
267 ByteViewArrayDecoder::Dictionary(d) => {
268 let dict = dict
269 .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
270 d.read(out, dict, len)
271 }
272 ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len),
273 ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len),
274 }
275 }
276
277 pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result<usize> {
279 match self {
280 ByteViewArrayDecoder::Plain(d) => d.skip(len),
281 ByteViewArrayDecoder::Dictionary(d) => {
282 let dict = dict
283 .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
284 d.skip(dict, len)
285 }
286 ByteViewArrayDecoder::DeltaLength(d) => d.skip(len),
287 ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len),
288 }
289 }
290}
291
292pub struct ByteViewArrayDecoderPlain {
294 buf: Buffer,
295 offset: usize,
296
297 validate_utf8: bool,
298
299 max_remaining_values: usize,
302}
303
304impl ByteViewArrayDecoderPlain {
305 pub fn new(
306 buf: Bytes,
307 num_levels: usize,
308 num_values: Option<usize>,
309 validate_utf8: bool,
310 ) -> Self {
311 Self {
312 buf: Buffer::from(buf),
313 offset: 0,
314 max_remaining_values: num_values.unwrap_or(num_levels),
315 validate_utf8,
316 }
317 }
318
319 pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
320 let block_id = {
323 if output.buffers.last().is_some_and(|x| x.ptr_eq(&self.buf)) {
324 output.buffers.len() as u32 - 1
325 } else {
326 output.append_block(self.buf.clone())
327 }
328 };
329
330 let to_read = len.min(self.max_remaining_values);
331
332 let buf = self.buf.as_ref();
333 let mut read = 0;
334 output.views.reserve(to_read);
335
336 let mut utf8_validation_begin = self.offset;
337 while self.offset < self.buf.len() && read != to_read {
338 if self.offset + 4 > self.buf.len() {
339 return Err(ParquetError::EOF("eof decoding byte array".into()));
340 }
341 let len_bytes: [u8; 4] = unsafe {
342 buf.get_unchecked(self.offset..self.offset + 4)
343 .try_into()
344 .unwrap()
345 };
346 let len = u32::from_le_bytes(len_bytes);
347
348 let start_offset = self.offset + 4;
349 let end_offset = start_offset + len as usize;
350 if end_offset > buf.len() {
351 return Err(ParquetError::EOF("eof decoding byte array".into()));
352 }
353
354 if self.validate_utf8 {
355 if len < 128 {
377 } else {
380 check_valid_utf8(unsafe {
382 buf.get_unchecked(utf8_validation_begin..self.offset)
383 })?;
384 utf8_validation_begin = start_offset;
386 }
387 }
388
389 unsafe {
390 output.append_view_unchecked(block_id, start_offset as u32, len);
391 }
392 self.offset = end_offset;
393 read += 1;
394 }
395
396 if self.validate_utf8 {
398 check_valid_utf8(unsafe { buf.get_unchecked(utf8_validation_begin..self.offset) })?;
399 }
400
401 self.max_remaining_values -= to_read;
402 Ok(to_read)
403 }
404
405 pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
406 let to_skip = to_skip.min(self.max_remaining_values);
407 let mut skip = 0;
408 let buf = self.buf.as_ref();
409
410 while self.offset < self.buf.len() && skip != to_skip {
411 if self.offset + 4 > buf.len() {
412 return Err(ParquetError::EOF("eof decoding byte array".into()));
413 }
414 let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
415 let len = u32::from_le_bytes(len_bytes) as usize;
416 skip += 1;
417 self.offset = self.offset + 4 + len;
418 }
419 self.max_remaining_values -= skip;
420 Ok(skip)
421 }
422}
423
424pub struct ByteViewArrayDecoderDictionary {
425 decoder: DictIndexDecoder,
426}
427
428impl ByteViewArrayDecoderDictionary {
429 fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
430 Self {
431 decoder: DictIndexDecoder::new(data, num_levels, num_values),
432 }
433 }
434
435 fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result<usize> {
443 if dict.is_empty() || len == 0 {
444 return Ok(0);
445 }
446
447 let need_to_create_new_buffer = {
450 if output.buffers.len() >= dict.buffers.len() {
451 let offset = output.buffers.len() - dict.buffers.len();
452 output.buffers[offset..]
453 .iter()
454 .zip(dict.buffers.iter())
455 .any(|(a, b)| !a.ptr_eq(b))
456 } else {
457 true
458 }
459 };
460
461 if need_to_create_new_buffer {
462 for b in dict.buffers.iter() {
463 output.buffers.push(b.clone());
464 }
465 }
466
467 let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32;
471
472 self.decoder.read(len, |keys| {
473 for k in keys {
474 let view = dict
475 .views
476 .get(*k as usize)
477 .ok_or_else(|| general_err!("invalid key={} for dictionary", *k))?;
478 let len = *view as u32;
479 if len <= 12 {
480 unsafe {
483 output.append_raw_view_unchecked(view);
484 }
485 } else {
486 let mut view = ByteView::from(*view);
488 view.buffer_index += base_buffer_idx;
489 unsafe {
492 output.append_raw_view_unchecked(&view.into());
493 }
494 }
495 }
496 Ok(())
497 })
498 }
499
500 fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result<usize> {
501 if dict.is_empty() {
502 return Ok(0);
503 }
504 self.decoder.skip(to_skip)
505 }
506}
507
508pub struct ByteViewArrayDecoderDeltaLength {
510 lengths: Vec<i32>,
511 data: Bytes,
512 length_offset: usize,
513 data_offset: usize,
514 validate_utf8: bool,
515}
516
517impl ByteViewArrayDecoderDeltaLength {
518 fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
519 let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
520 len_decoder.set_data(data.clone(), 0)?;
521 let values = len_decoder.values_left();
522
523 let mut lengths = vec![0; values];
524 len_decoder.get(&mut lengths)?;
525
526 let mut total_bytes = 0;
527
528 for l in lengths.iter() {
529 if *l < 0 {
530 return Err(ParquetError::General(
531 "negative delta length byte array length".to_string(),
532 ));
533 }
534 total_bytes += *l as usize;
535 }
536
537 if total_bytes + len_decoder.get_offset() > data.len() {
538 return Err(ParquetError::General(
539 "Insufficient delta length byte array bytes".to_string(),
540 ));
541 }
542
543 Ok(Self {
544 lengths,
545 data,
546 validate_utf8,
547 length_offset: 0,
548 data_offset: len_decoder.get_offset(),
549 })
550 }
551
552 fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
553 let to_read = len.min(self.lengths.len() - self.length_offset);
554 output.views.reserve(to_read);
555
556 let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];
557
558 let bytes = Buffer::from(self.data.clone());
560 let block_id = output.append_block(bytes);
561
562 let mut current_offset = self.data_offset;
563 let initial_offset = current_offset;
564 for length in src_lengths {
565 unsafe { output.append_view_unchecked(block_id, current_offset as u32, *length as u32) }
570
571 current_offset += *length as usize;
572 }
573
574 if self.validate_utf8 {
576 check_valid_utf8(&self.data[initial_offset..current_offset])?;
577 }
578
579 self.data_offset = current_offset;
580 self.length_offset += to_read;
581
582 Ok(to_read)
583 }
584
585 fn skip(&mut self, to_skip: usize) -> Result<usize> {
586 let remain_values = self.lengths.len() - self.length_offset;
587 let to_skip = remain_values.min(to_skip);
588
589 let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip];
590 let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
591
592 self.data_offset += total_bytes;
593 self.length_offset += to_skip;
594 Ok(to_skip)
595 }
596}
597
598pub struct ByteViewArrayDecoderDelta {
600 decoder: DeltaByteArrayDecoder,
601 validate_utf8: bool,
602}
603
604impl ByteViewArrayDecoderDelta {
605 fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
606 Ok(Self {
607 decoder: DeltaByteArrayDecoder::new(data)?,
608 validate_utf8,
609 })
610 }
611
612 fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
621 output.views.reserve(len.min(self.decoder.remaining()));
622
623 let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);
625
626 let buffer_id = output.buffers.len() as u32;
627
628 let read = if !self.validate_utf8 {
629 self.decoder.read(len, |bytes| {
630 let offset = array_buffer.len();
631 let view = make_view(bytes, buffer_id, offset as u32);
632 if bytes.len() > 12 {
633 array_buffer.extend_from_slice(bytes);
635 }
636
637 unsafe {
641 output.append_raw_view_unchecked(&view);
642 }
643 Ok(())
644 })?
645 } else {
646 let mut utf8_validation_buffer = Vec::with_capacity(4096);
650
651 let v = self.decoder.read(len, |bytes| {
652 let offset = array_buffer.len();
653 let view = make_view(bytes, buffer_id, offset as u32);
654 if bytes.len() > 12 {
655 array_buffer.extend_from_slice(bytes);
657 } else {
658 utf8_validation_buffer.extend_from_slice(bytes);
659 }
660
661 unsafe {
666 output.append_raw_view_unchecked(&view);
667 }
668 Ok(())
669 })?;
670 check_valid_utf8(&array_buffer)?;
671 check_valid_utf8(&utf8_validation_buffer)?;
672 v
673 };
674
675 let actual_block_id = output.append_block(Buffer::from_vec(array_buffer));
676 assert_eq!(actual_block_id, buffer_id);
677 Ok(read)
678 }
679
680 fn skip(&mut self, to_skip: usize) -> Result<usize> {
681 self.decoder.skip(to_skip)
682 }
683}
684
685#[cfg(test)]
686mod tests {
687 use arrow_array::StringViewArray;
688 use arrow_buffer::Buffer;
689
690 use crate::{
691 arrow::{
692 array_reader::test_util::{byte_array_all_encodings, encode_byte_array, utf8_column},
693 buffer::view_buffer::ViewBuffer,
694 record_reader::buffer::ValuesBuffer,
695 },
696 basic::Encoding,
697 column::reader::decoder::ColumnValueDecoder,
698 data_type::ByteArray,
699 };
700
701 use super::*;
702
703 #[test]
704 fn test_byte_array_string_view_decoder() {
705 let (pages, encoded_dictionary) =
706 byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]);
707
708 let column_desc = utf8_column();
709 let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
710
711 decoder
712 .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
713 .unwrap();
714
715 for (encoding, page) in pages {
716 let mut output = ViewBuffer::default();
717 decoder.set_data(encoding, page, 4, Some(4)).unwrap();
718
719 assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
720 assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
721 assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
722 assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
723
724 assert_eq!(output.views.len(), 4);
725
726 let valid = [false, false, true, true, false, true, true, false, false];
727 let valid_buffer = Buffer::from_iter(valid.iter().cloned());
728
729 output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
730 let array = output.into_array(Some(valid_buffer), &ArrowType::Utf8View);
731 let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();
732
733 assert_eq!(
734 strings.iter().collect::<Vec<_>>(),
735 vec![
736 None,
737 None,
738 Some("hello"),
739 Some("world"),
740 None,
741 Some("large payload over 12 bytes"),
742 Some("b"),
743 None,
744 None,
745 ]
746 );
747 }
748 }
749
750 #[test]
751 fn test_byte_view_array_plain_decoder_reuse_buffer() {
752 let byte_array = vec!["hello", "world", "large payload over 12 bytes", "b"];
753 let byte_array: Vec<ByteArray> = byte_array.into_iter().map(|x| x.into()).collect();
754 let pages = encode_byte_array(Encoding::PLAIN, &byte_array);
755
756 let column_desc = utf8_column();
757 let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
758
759 let mut view_buffer = ViewBuffer::default();
760 decoder.set_data(Encoding::PLAIN, pages, 4, None).unwrap();
761 decoder.read(&mut view_buffer, 1).unwrap();
762 decoder.read(&mut view_buffer, 1).unwrap();
763 assert_eq!(view_buffer.buffers.len(), 1);
764
765 decoder.read(&mut view_buffer, 1).unwrap();
766 assert_eq!(view_buffer.buffers.len(), 1);
767 }
768}