Skip to main content

parquet/arrow/array_reader/
byte_view_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{ArrayReader, read_records, skip_records};
19use crate::arrow::buffer::view_buffer::ViewBuffer;
20use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
21use crate::arrow::record_reader::GenericRecordReader;
22use crate::arrow::schema::parquet_to_arrow_field;
23use crate::basic::{ConvertedType, Encoding};
24use crate::column::page::PageIterator;
25use crate::column::reader::decoder::ColumnValueDecoder;
26use crate::data_type::Int32Type;
27use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
28use crate::errors::{ParquetError, Result};
29use crate::schema::types::ColumnDescPtr;
30use crate::util::utf8::check_valid_utf8;
31use arrow_array::{ArrayRef, builder::make_view};
32use arrow_buffer::Buffer;
33use arrow_data::ByteView;
34use arrow_schema::DataType as ArrowType;
35use bytes::Bytes;
36use std::any::Any;
37
38/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types.
39pub fn make_byte_view_array_reader(
40    pages: Box<dyn PageIterator>,
41    column_desc: ColumnDescPtr,
42    arrow_type: Option<ArrowType>,
43) -> Result<Box<dyn ArrayReader>> {
44    // Check if Arrow type is specified, else create it from Parquet type
45    let data_type = match arrow_type {
46        Some(t) => t,
47        None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() {
48            ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View,
49            _ => ArrowType::BinaryView,
50        },
51    };
52
53    match data_type {
54        ArrowType::BinaryView | ArrowType::Utf8View => {
55            let reader = GenericRecordReader::new(column_desc);
56            Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader)))
57        }
58
59        _ => Err(general_err!(
60            "invalid data type for byte array reader read to view type - {}",
61            data_type
62        )),
63    }
64}
65
66/// An [`ArrayReader`] for variable length byte arrays
67struct ByteViewArrayReader {
68    data_type: ArrowType,
69    pages: Box<dyn PageIterator>,
70    def_levels_buffer: Option<Vec<i16>>,
71    rep_levels_buffer: Option<Vec<i16>>,
72    record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>,
73}
74
75impl ByteViewArrayReader {
76    fn new(
77        pages: Box<dyn PageIterator>,
78        data_type: ArrowType,
79        record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>,
80    ) -> Self {
81        Self {
82            data_type,
83            pages,
84            def_levels_buffer: None,
85            rep_levels_buffer: None,
86            record_reader,
87        }
88    }
89}
90
91impl ArrayReader for ByteViewArrayReader {
92    fn as_any(&self) -> &dyn Any {
93        self
94    }
95
96    fn get_data_type(&self) -> &ArrowType {
97        &self.data_type
98    }
99
100    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
101        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
102    }
103
104    fn consume_batch(&mut self) -> Result<ArrayRef> {
105        let buffer = self.record_reader.consume_record_data();
106        let null_buffer = self.record_reader.consume_bitmap_buffer();
107        self.def_levels_buffer = self.record_reader.consume_def_levels();
108        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
109        self.record_reader.reset();
110
111        let array = buffer.into_array(null_buffer, &self.data_type);
112
113        Ok(array)
114    }
115
116    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
117        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
118    }
119
120    fn get_def_levels(&self) -> Option<&[i16]> {
121        self.def_levels_buffer.as_deref()
122    }
123
124    fn get_rep_levels(&self) -> Option<&[i16]> {
125        self.rep_levels_buffer.as_deref()
126    }
127}
128
129/// A [`ColumnValueDecoder`] for variable length byte arrays
130struct ByteViewArrayColumnValueDecoder {
131    dict: Option<ViewBuffer>,
132    decoder: Option<ByteViewArrayDecoder>,
133    validate_utf8: bool,
134}
135
136impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder {
137    type Buffer = ViewBuffer;
138
139    fn new(desc: &ColumnDescPtr) -> Self {
140        let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
141        Self {
142            dict: None,
143            decoder: None,
144            validate_utf8,
145        }
146    }
147
148    fn set_dict(
149        &mut self,
150        buf: Bytes,
151        num_values: u32,
152        encoding: Encoding,
153        _is_sorted: bool,
154    ) -> Result<()> {
155        if !matches!(
156            encoding,
157            Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
158        ) {
159            return Err(nyi_err!(
160                "Invalid/Unsupported encoding type for dictionary: {}",
161                encoding
162            ));
163        }
164
165        let mut buffer = ViewBuffer::default();
166        let mut decoder = ByteViewArrayDecoderPlain::new(
167            buf,
168            num_values as usize,
169            Some(num_values as usize),
170            self.validate_utf8,
171        );
172        decoder.read(&mut buffer, usize::MAX)?;
173        self.dict = Some(buffer);
174        Ok(())
175    }
176
177    fn set_data(
178        &mut self,
179        encoding: Encoding,
180        data: Bytes,
181        num_levels: usize,
182        num_values: Option<usize>,
183    ) -> Result<()> {
184        self.decoder = Some(ByteViewArrayDecoder::new(
185            encoding,
186            data,
187            num_levels,
188            num_values,
189            self.validate_utf8,
190        )?);
191        Ok(())
192    }
193
194    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
195        let decoder = self
196            .decoder
197            .as_mut()
198            .ok_or_else(|| general_err!("no decoder set"))?;
199
200        decoder.read(out, num_values, self.dict.as_ref())
201    }
202
203    fn skip_values(&mut self, num_values: usize) -> Result<usize> {
204        let decoder = self
205            .decoder
206            .as_mut()
207            .ok_or_else(|| general_err!("no decoder set"))?;
208
209        decoder.skip(num_values, self.dict.as_ref())
210    }
211}
212
213/// A generic decoder from uncompressed parquet value data to [`ViewBuffer`]
214pub enum ByteViewArrayDecoder {
215    Plain(ByteViewArrayDecoderPlain),
216    Dictionary(ByteViewArrayDecoderDictionary),
217    DeltaLength(ByteViewArrayDecoderDeltaLength),
218    DeltaByteArray(ByteViewArrayDecoderDelta),
219}
220
221impl ByteViewArrayDecoder {
222    pub fn new(
223        encoding: Encoding,
224        data: Bytes,
225        num_levels: usize,
226        num_values: Option<usize>,
227        validate_utf8: bool,
228    ) -> Result<Self> {
229        let decoder = match encoding {
230            Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new(
231                data,
232                num_levels,
233                num_values,
234                validate_utf8,
235            )),
236            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
237                ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new(
238                    data, num_levels, num_values,
239                )?)
240            }
241            Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength(
242                ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?,
243            ),
244            Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray(
245                ByteViewArrayDecoderDelta::new(data, validate_utf8)?,
246            ),
247            _ => {
248                return Err(general_err!(
249                    "unsupported encoding for byte array: {}",
250                    encoding
251                ));
252            }
253        };
254
255        Ok(decoder)
256    }
257
258    /// Read up to `len` values to `out` with the optional dictionary
259    pub fn read(
260        &mut self,
261        out: &mut ViewBuffer,
262        len: usize,
263        dict: Option<&ViewBuffer>,
264    ) -> Result<usize> {
265        match self {
266            ByteViewArrayDecoder::Plain(d) => d.read(out, len),
267            ByteViewArrayDecoder::Dictionary(d) => {
268                let dict = dict
269                    .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
270                d.read(out, dict, len)
271            }
272            ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len),
273            ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len),
274        }
275    }
276
277    /// Skip `len` values
278    pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result<usize> {
279        match self {
280            ByteViewArrayDecoder::Plain(d) => d.skip(len),
281            ByteViewArrayDecoder::Dictionary(d) => {
282                let dict = dict
283                    .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
284                d.skip(dict, len)
285            }
286            ByteViewArrayDecoder::DeltaLength(d) => d.skip(len),
287            ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len),
288        }
289    }
290}
291
292/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`]
293pub struct ByteViewArrayDecoderPlain {
294    buf: Buffer,
295    offset: usize,
296
297    validate_utf8: bool,
298
299    /// This is a maximum as the null count is not always known, e.g. value data from
300    /// a v1 data page
301    max_remaining_values: usize,
302}
303
304impl ByteViewArrayDecoderPlain {
305    pub fn new(
306        buf: Bytes,
307        num_levels: usize,
308        num_values: Option<usize>,
309        validate_utf8: bool,
310    ) -> Self {
311        Self {
312            buf: Buffer::from(buf),
313            offset: 0,
314            max_remaining_values: num_values.unwrap_or(num_levels),
315            validate_utf8,
316        }
317    }
318
319    pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
320        if self.validate_utf8 {
321            self.read_impl::<true>(output, len)
322        } else {
323            self.read_impl::<false>(output, len)
324        }
325    }
326
327    fn read_impl<const VALIDATE_UTF8: bool>(
328        &mut self,
329        output: &mut ViewBuffer,
330        len: usize,
331    ) -> Result<usize> {
332        // avoid creating a new buffer if the last buffer is the same as the current buffer
333        // This is especially useful when row-level filtering is applied, where we call lots of small `read` over the same buffer.
334        let block_id = {
335            if output.buffers.last().is_some_and(|x| x.ptr_eq(&self.buf)) {
336                output.buffers.len() as u32 - 1
337            } else {
338                output.append_block(self.buf.clone())
339            }
340        };
341
342        let to_read = len.min(self.max_remaining_values);
343
344        let buf: &[u8] = self.buf.as_ref();
345        let buf_len = buf.len();
346        let mut end_offset = self.offset;
347        let mut utf8_validation_begin = end_offset;
348
349        output.views.reserve(to_read);
350
351        // Safety: we reserved enough space in output.views
352        // and we will only write up to to_read views / track how many views we wrote.
353        // Ideally, we would use `Vec::extend` here, but this generates sub-optimal code.
354        let views_ptr = output.views.as_mut_ptr().wrapping_add(output.views.len());
355        for i in 0..to_read {
356            let start_offset = end_offset + 4;
357
358            if start_offset > buf_len {
359                return Err(ParquetError::EOF("eof decoding byte array".into()));
360            }
361
362            // Safety: we have checked that start_offset <= buf_len
363            let len = u32::from_le_bytes(
364                unsafe { buf.get_unchecked(end_offset..start_offset) }
365                    .try_into()
366                    .unwrap(),
367            );
368
369            end_offset = start_offset + len as usize;
370
371            if end_offset > buf_len {
372                return Err(ParquetError::EOF("eof decoding byte array".into()));
373            }
374
375            if VALIDATE_UTF8 {
376                // It seems you are trying to understand what's going on here, take a breath and be patient.
377                // Utf-8 validation is a non-trivial task, here are some background facts:
378                // (1) Validating one 2048-byte string is much faster than validating 128 of 16-byte string.
379                //     As shown in https://github.com/apache/arrow-rs/pull/6009#issuecomment-2211174229
380                //     Potentially because the SIMD operations favor longer strings.
381                // (2) Practical strings are short, 99% of strings are smaller than 100 bytes, as shown in paper:
382                //     https://www.vldb.org/pvldb/vol17/p148-zeng.pdf, Figure 5f.
383                // (3) Parquet plain encoding makes utf-8 validation harder,
384                //     because it stores the length of each string right before the string.
385                //     This means naive utf-8 validation will be slow, because the validation need to skip the length bytes.
386                //     I.e., the validation cannot validate the buffer in one pass, but instead, validate strings chunk by chunk.
387                //
388                // Given the above observations, the goal is to do batch validation as much as possible.
389                // The key idea is that if the length is smaller than 128 (99% of the case), then the length bytes are valid utf-8, as reasoned below:
390                // If the length is smaller than 128, its 4-byte encoding are [0, 0, 0, len].
391                // Each of the byte is a valid ASCII character, so they are valid utf-8.
392                // Since they are all smaller than 128, the won't break a utf-8 code point (won't mess with later bytes).
393                //
394                // The implementation keeps a water mark `utf8_validation_begin` to track the beginning of the buffer that is not validated.
395                // If the length is smaller than 128, then we continue to next string.
396                // If the length is larger than 128, then we validate the buffer before the length bytes, and move the water mark to the beginning of next string.
397                if len >= 128 {
398                    // unfortunately, the len bytes may not be valid utf8, we need to wrap up and validate everything before it.
399                    check_valid_utf8(unsafe {
400                        buf.get_unchecked(utf8_validation_begin..start_offset - 4)
401                    })?;
402                    // move the cursor to skip the len bytes.
403                    utf8_validation_begin = start_offset;
404                }
405            }
406
407            let view = make_view(
408                unsafe { buf.get_unchecked(start_offset..end_offset) },
409                block_id,
410                start_offset as u32,
411            );
412            // Safety: views_ptr is valid for writes, and we have reserved enough space.
413            unsafe {
414                views_ptr.add(i).write(view);
415            }
416        }
417
418        // Safety: we have written `to_read` views to `views_ptr`
419        unsafe {
420            output.views.set_len(output.views.len() + to_read);
421        }
422        if VALIDATE_UTF8 {
423            // validate values from the previously validated location up to (but not including)
424            // the length of this string
425            check_valid_utf8(unsafe { buf.get_unchecked(utf8_validation_begin..end_offset) })?;
426        }
427
428        self.offset = end_offset;
429        self.max_remaining_values -= to_read;
430
431        Ok(to_read)
432    }
433
434    pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
435        let to_skip = to_skip.min(self.max_remaining_values);
436        let mut skip = 0;
437        let buf: &[u8] = self.buf.as_ref();
438
439        while self.offset < self.buf.len() && skip != to_skip {
440            if self.offset + 4 > buf.len() {
441                return Err(ParquetError::EOF("eof decoding byte array".into()));
442            }
443            let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
444            let len = u32::from_le_bytes(len_bytes) as usize;
445            skip += 1;
446            self.offset = self.offset + 4 + len;
447        }
448        self.max_remaining_values -= skip;
449        Ok(skip)
450    }
451}
452
453pub struct ByteViewArrayDecoderDictionary {
454    decoder: DictIndexDecoder,
455}
456
457impl ByteViewArrayDecoderDictionary {
458    fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Result<Self> {
459        Ok(Self {
460            decoder: DictIndexDecoder::new(data, num_levels, num_values)?,
461        })
462    }
463
464    /// Reads the next indexes from self.decoder
465    /// the indexes are assumed to be indexes into `dict`
466    /// the output values are written to output
467    ///
468    /// Assumptions / Optimization
469    /// This function checks if dict.buffers() are the last buffers in `output`, and if so
470    /// reuses the dictionary page buffers directly without copying data
471    ///
472    /// If the dictionary is empty, the buffer contains empty view.
473    fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result<usize> {
474        if dict.is_empty() || len == 0 {
475            return Ok(0);
476        }
477
478        // Check if the last few buffer of `output`` are the same as the `dict` buffer
479        // This is to avoid creating a new buffers if the same dictionary is used for multiple `read`
480        let need_to_create_new_buffer = {
481            if output.buffers.len() >= dict.buffers.len() {
482                let offset = output.buffers.len() - dict.buffers.len();
483                output.buffers[offset..]
484                    .iter()
485                    .zip(dict.buffers.iter())
486                    .any(|(a, b)| !a.ptr_eq(b))
487            } else {
488                true
489            }
490        };
491
492        if need_to_create_new_buffer {
493            for b in dict.buffers.iter() {
494                output.buffers.push(b.clone());
495            }
496        }
497
498        // Calculate the offset of the dictionary buffers in the output buffers
499        // For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers,
500        // then the base_buffer_idx is 5 - 2 = 3
501        let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32;
502
503        let mut error = None;
504        let read = self.decoder.read(len, |keys| {
505            if base_buffer_idx == 0 {
506                // the dictionary buffers are the last buffers in output, we can directly use the views
507                output
508                    .views
509                    .extend(keys.iter().map(|k| match dict.views.get(*k as usize) {
510                        Some(&view) => view,
511                        None => {
512                            if error.is_none() {
513                                error = Some(general_err!("invalid key={} for dictionary", *k));
514                            }
515                            0
516                        }
517                    }));
518                Ok(())
519            } else {
520                output
521                    .views
522                    .extend(keys.iter().map(|k| match dict.views.get(*k as usize) {
523                        Some(&view) => {
524                            let len = view as u32;
525                            if len <= 12 {
526                                view
527                            } else {
528                                let mut view = ByteView::from(view);
529                                view.buffer_index += base_buffer_idx;
530                                view.into()
531                            }
532                        }
533                        None => {
534                            if error.is_none() {
535                                error = Some(general_err!("invalid key={} for dictionary", *k));
536                            }
537                            0
538                        }
539                    }));
540                Ok(())
541            }
542        })?;
543        if let Some(e) = error {
544            return Err(e);
545        }
546        Ok(read)
547    }
548
549    fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result<usize> {
550        if dict.is_empty() {
551            return Ok(0);
552        }
553        self.decoder.skip(to_skip)
554    }
555}
556
557/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
558pub struct ByteViewArrayDecoderDeltaLength {
559    lengths: Vec<i32>,
560    data: Bytes,
561    length_offset: usize,
562    data_offset: usize,
563    validate_utf8: bool,
564}
565
566impl ByteViewArrayDecoderDeltaLength {
567    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
568        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
569        len_decoder.set_data(data.clone(), 0)?;
570        let values = len_decoder.values_left();
571
572        let mut lengths = vec![0; values];
573        len_decoder.get(&mut lengths)?;
574
575        let mut total_bytes = 0;
576
577        for l in lengths.iter() {
578            if *l < 0 {
579                return Err(ParquetError::General(
580                    "negative delta length byte array length".to_string(),
581                ));
582            }
583            total_bytes += *l as usize;
584        }
585
586        if total_bytes + len_decoder.get_offset() > data.len() {
587            return Err(ParquetError::General(
588                "Insufficient delta length byte array bytes".to_string(),
589            ));
590        }
591
592        Ok(Self {
593            lengths,
594            data,
595            validate_utf8,
596            length_offset: 0,
597            data_offset: len_decoder.get_offset(),
598        })
599    }
600
601    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
602        let to_read = len.min(self.lengths.len() - self.length_offset);
603        output.views.reserve(to_read);
604
605        let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];
606
607        // Zero copy convert `bytes::Bytes` into `arrow_buffer::Buffer`
608        let bytes = Buffer::from(self.data.clone());
609        let block_id = output.append_block(bytes);
610
611        let mut current_offset = self.data_offset;
612        let initial_offset = current_offset;
613
614        output.views.extend(src_lengths.iter().map(|length| {
615            let len = *length as u32;
616            let start_offset = current_offset;
617            current_offset += len as usize;
618            // # Safety
619            // The length and offset are guaranteed valid by the entry check in `new`
620            make_view(
621                &self.data[start_offset..start_offset + len as usize],
622                block_id,
623                start_offset as u32,
624            )
625        }));
626
627        // Delta length encoding has continuous strings, we can validate utf8 in one go
628        if self.validate_utf8 {
629            check_valid_utf8(&self.data[initial_offset..current_offset])?;
630        }
631
632        self.data_offset = current_offset;
633        self.length_offset += to_read;
634
635        Ok(to_read)
636    }
637
638    fn skip(&mut self, to_skip: usize) -> Result<usize> {
639        let remain_values = self.lengths.len() - self.length_offset;
640        let to_skip = remain_values.min(to_skip);
641
642        let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip];
643        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
644
645        self.data_offset += total_bytes;
646        self.length_offset += to_skip;
647        Ok(to_skip)
648    }
649}
650
651/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
652pub struct ByteViewArrayDecoderDelta {
653    decoder: DeltaByteArrayDecoder,
654    validate_utf8: bool,
655}
656
657impl ByteViewArrayDecoderDelta {
658    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
659        Ok(Self {
660            decoder: DeltaByteArrayDecoder::new(data)?,
661            validate_utf8,
662        })
663    }
664
665    // Unlike other encodings, we need to copy the data.
666    //
667    //  DeltaByteArray data is stored using shared prefixes/suffixes,
668    // which results in potentially non-contiguous
669    // strings, while Arrow encodings require contiguous strings
670    //
671    // <https://parquet.apache.org/docs/file-format/data-pages/encodings/#delta-strings-delta_byte_array--7>
672
673    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
674        output.views.reserve(len.min(self.decoder.remaining()));
675
676        // array buffer only have long strings
677        let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);
678
679        let buffer_id = output.buffers.len() as u32;
680
681        let read = if !self.validate_utf8 {
682            self.decoder.read(len, |bytes| {
683                let offset = array_buffer.len();
684                let view = make_view(bytes, buffer_id, offset as u32);
685                if bytes.len() > 12 {
686                    // only copy the data to buffer if the string can not be inlined.
687                    array_buffer.extend_from_slice(bytes);
688                }
689
690                // # Safety
691                // The buffer_id is the last buffer in the output buffers
692                // The offset is calculated from the buffer, so it is valid
693                unsafe {
694                    output.append_raw_view_unchecked(view);
695                }
696                Ok(())
697            })?
698        } else {
699            // utf8 validation buffer has only short strings. These short
700            // strings are inlined into the views but we copy them into a
701            // contiguous buffer to accelerate validation.®
702            let mut utf8_validation_buffer = Vec::with_capacity(4096);
703
704            let v = self.decoder.read(len, |bytes| {
705                let offset = array_buffer.len();
706                let view = make_view(bytes, buffer_id, offset as u32);
707                if bytes.len() > 12 {
708                    // only copy the data to buffer if the string can not be inlined.
709                    array_buffer.extend_from_slice(bytes);
710                } else {
711                    utf8_validation_buffer.extend_from_slice(bytes);
712                }
713
714                // # Safety
715                // The buffer_id is the last buffer in the output buffers
716                // The offset is calculated from the buffer, so it is valid
717                // Utf-8 validation is done later
718                unsafe {
719                    output.append_raw_view_unchecked(view);
720                }
721                Ok(())
722            })?;
723            check_valid_utf8(&array_buffer)?;
724            check_valid_utf8(&utf8_validation_buffer)?;
725            v
726        };
727
728        let actual_block_id = output.append_block(Buffer::from_vec(array_buffer));
729        assert_eq!(actual_block_id, buffer_id);
730        Ok(read)
731    }
732
733    fn skip(&mut self, to_skip: usize) -> Result<usize> {
734        self.decoder.skip(to_skip)
735    }
736}
737
738#[cfg(test)]
739mod tests {
740    use arrow_array::StringViewArray;
741    use arrow_buffer::Buffer;
742
743    use crate::{
744        arrow::{
745            array_reader::test_util::{byte_array_all_encodings, encode_byte_array, utf8_column},
746            buffer::view_buffer::ViewBuffer,
747            record_reader::buffer::ValuesBuffer,
748        },
749        basic::Encoding,
750        column::reader::decoder::ColumnValueDecoder,
751        data_type::ByteArray,
752    };
753
754    use super::*;
755
756    #[test]
757    fn test_byte_array_string_view_decoder() {
758        let (pages, encoded_dictionary) =
759            byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]);
760
761        let column_desc = utf8_column();
762        let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
763
764        decoder
765            .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
766            .unwrap();
767
768        for (encoding, page) in pages {
769            let mut output = ViewBuffer::default();
770            decoder.set_data(encoding, page, 4, Some(4)).unwrap();
771
772            assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
773            assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
774            assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
775            assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
776
777            assert_eq!(output.views.len(), 4);
778
779            let valid = [false, false, true, true, false, true, true, false, false];
780            let valid_buffer = Buffer::from_iter(valid.iter().cloned());
781
782            output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
783            let array = output.into_array(Some(valid_buffer), &ArrowType::Utf8View);
784            let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();
785
786            assert_eq!(
787                strings.iter().collect::<Vec<_>>(),
788                vec![
789                    None,
790                    None,
791                    Some("hello"),
792                    Some("world"),
793                    None,
794                    Some("large payload over 12 bytes"),
795                    Some("b"),
796                    None,
797                    None,
798                ]
799            );
800        }
801    }
802
803    #[test]
804    fn test_byte_view_array_plain_decoder_reuse_buffer() {
805        let byte_array = vec!["hello", "world", "large payload over 12 bytes", "b"];
806        let byte_array: Vec<ByteArray> = byte_array.into_iter().map(|x| x.into()).collect();
807        let pages = encode_byte_array(Encoding::PLAIN, &byte_array);
808
809        let column_desc = utf8_column();
810        let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
811
812        let mut view_buffer = ViewBuffer::default();
813        decoder.set_data(Encoding::PLAIN, pages, 4, None).unwrap();
814        decoder.read(&mut view_buffer, 1).unwrap();
815        decoder.read(&mut view_buffer, 1).unwrap();
816        assert_eq!(view_buffer.buffers.len(), 1);
817
818        decoder.read(&mut view_buffer, 1).unwrap();
819        assert_eq!(view_buffer.buffers.len(), 1);
820    }
821}