parquet/arrow/array_reader/
byte_view_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::buffer::view_buffer::ViewBuffer;
20use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
21use crate::arrow::record_reader::GenericRecordReader;
22use crate::arrow::schema::parquet_to_arrow_field;
23use crate::basic::{ConvertedType, Encoding};
24use crate::column::page::PageIterator;
25use crate::column::reader::decoder::ColumnValueDecoder;
26use crate::data_type::Int32Type;
27use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
28use crate::errors::{ParquetError, Result};
29use crate::schema::types::ColumnDescPtr;
30use crate::util::utf8::check_valid_utf8;
31use arrow_array::{builder::make_view, ArrayRef};
32use arrow_buffer::Buffer;
33use arrow_data::ByteView;
34use arrow_schema::DataType as ArrowType;
35use bytes::Bytes;
36use std::any::Any;
37
38/// Returns an [`ArrayReader`] that decodes the provided byte array column to view types.
39pub fn make_byte_view_array_reader(
40    pages: Box<dyn PageIterator>,
41    column_desc: ColumnDescPtr,
42    arrow_type: Option<ArrowType>,
43) -> Result<Box<dyn ArrayReader>> {
44    // Check if Arrow type is specified, else create it from Parquet type
45    let data_type = match arrow_type {
46        Some(t) => t,
47        None => match parquet_to_arrow_field(column_desc.as_ref())?.data_type() {
48            ArrowType::Utf8 | ArrowType::Utf8View => ArrowType::Utf8View,
49            _ => ArrowType::BinaryView,
50        },
51    };
52
53    match data_type {
54        ArrowType::BinaryView | ArrowType::Utf8View => {
55            let reader = GenericRecordReader::new(column_desc);
56            Ok(Box::new(ByteViewArrayReader::new(pages, data_type, reader)))
57        }
58
59        _ => Err(general_err!(
60            "invalid data type for byte array reader read to view type - {}",
61            data_type
62        )),
63    }
64}
65
66/// An [`ArrayReader`] for variable length byte arrays
67struct ByteViewArrayReader {
68    data_type: ArrowType,
69    pages: Box<dyn PageIterator>,
70    def_levels_buffer: Option<Vec<i16>>,
71    rep_levels_buffer: Option<Vec<i16>>,
72    record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>,
73}
74
75impl ByteViewArrayReader {
76    fn new(
77        pages: Box<dyn PageIterator>,
78        data_type: ArrowType,
79        record_reader: GenericRecordReader<ViewBuffer, ByteViewArrayColumnValueDecoder>,
80    ) -> Self {
81        Self {
82            data_type,
83            pages,
84            def_levels_buffer: None,
85            rep_levels_buffer: None,
86            record_reader,
87        }
88    }
89}
90
91impl ArrayReader for ByteViewArrayReader {
92    fn as_any(&self) -> &dyn Any {
93        self
94    }
95
96    fn get_data_type(&self) -> &ArrowType {
97        &self.data_type
98    }
99
100    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
101        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
102    }
103
104    fn consume_batch(&mut self) -> Result<ArrayRef> {
105        let buffer = self.record_reader.consume_record_data();
106        let null_buffer = self.record_reader.consume_bitmap_buffer();
107        self.def_levels_buffer = self.record_reader.consume_def_levels();
108        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
109        self.record_reader.reset();
110
111        let array = buffer.into_array(null_buffer, &self.data_type);
112
113        Ok(array)
114    }
115
116    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
117        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
118    }
119
120    fn get_def_levels(&self) -> Option<&[i16]> {
121        self.def_levels_buffer.as_deref()
122    }
123
124    fn get_rep_levels(&self) -> Option<&[i16]> {
125        self.rep_levels_buffer.as_deref()
126    }
127}
128
129/// A [`ColumnValueDecoder`] for variable length byte arrays
130struct ByteViewArrayColumnValueDecoder {
131    dict: Option<ViewBuffer>,
132    decoder: Option<ByteViewArrayDecoder>,
133    validate_utf8: bool,
134}
135
136impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder {
137    type Buffer = ViewBuffer;
138
139    fn new(desc: &ColumnDescPtr) -> Self {
140        let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
141        Self {
142            dict: None,
143            decoder: None,
144            validate_utf8,
145        }
146    }
147
148    fn set_dict(
149        &mut self,
150        buf: Bytes,
151        num_values: u32,
152        encoding: Encoding,
153        _is_sorted: bool,
154    ) -> Result<()> {
155        if !matches!(
156            encoding,
157            Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
158        ) {
159            return Err(nyi_err!(
160                "Invalid/Unsupported encoding type for dictionary: {}",
161                encoding
162            ));
163        }
164
165        let mut buffer = ViewBuffer::default();
166        let mut decoder = ByteViewArrayDecoderPlain::new(
167            buf,
168            num_values as usize,
169            Some(num_values as usize),
170            self.validate_utf8,
171        );
172        decoder.read(&mut buffer, usize::MAX)?;
173        self.dict = Some(buffer);
174        Ok(())
175    }
176
177    fn set_data(
178        &mut self,
179        encoding: Encoding,
180        data: Bytes,
181        num_levels: usize,
182        num_values: Option<usize>,
183    ) -> Result<()> {
184        self.decoder = Some(ByteViewArrayDecoder::new(
185            encoding,
186            data,
187            num_levels,
188            num_values,
189            self.validate_utf8,
190        )?);
191        Ok(())
192    }
193
194    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
195        let decoder = self
196            .decoder
197            .as_mut()
198            .ok_or_else(|| general_err!("no decoder set"))?;
199
200        decoder.read(out, num_values, self.dict.as_ref())
201    }
202
203    fn skip_values(&mut self, num_values: usize) -> Result<usize> {
204        let decoder = self
205            .decoder
206            .as_mut()
207            .ok_or_else(|| general_err!("no decoder set"))?;
208
209        decoder.skip(num_values, self.dict.as_ref())
210    }
211}
212
213/// A generic decoder from uncompressed parquet value data to [`ViewBuffer`]
214pub enum ByteViewArrayDecoder {
215    Plain(ByteViewArrayDecoderPlain),
216    Dictionary(ByteViewArrayDecoderDictionary),
217    DeltaLength(ByteViewArrayDecoderDeltaLength),
218    DeltaByteArray(ByteViewArrayDecoderDelta),
219}
220
221impl ByteViewArrayDecoder {
222    pub fn new(
223        encoding: Encoding,
224        data: Bytes,
225        num_levels: usize,
226        num_values: Option<usize>,
227        validate_utf8: bool,
228    ) -> Result<Self> {
229        let decoder = match encoding {
230            Encoding::PLAIN => ByteViewArrayDecoder::Plain(ByteViewArrayDecoderPlain::new(
231                data,
232                num_levels,
233                num_values,
234                validate_utf8,
235            )),
236            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
237                ByteViewArrayDecoder::Dictionary(ByteViewArrayDecoderDictionary::new(
238                    data, num_levels, num_values,
239                ))
240            }
241            Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteViewArrayDecoder::DeltaLength(
242                ByteViewArrayDecoderDeltaLength::new(data, validate_utf8)?,
243            ),
244            Encoding::DELTA_BYTE_ARRAY => ByteViewArrayDecoder::DeltaByteArray(
245                ByteViewArrayDecoderDelta::new(data, validate_utf8)?,
246            ),
247            _ => {
248                return Err(general_err!(
249                    "unsupported encoding for byte array: {}",
250                    encoding
251                ))
252            }
253        };
254
255        Ok(decoder)
256    }
257
258    /// Read up to `len` values to `out` with the optional dictionary
259    pub fn read(
260        &mut self,
261        out: &mut ViewBuffer,
262        len: usize,
263        dict: Option<&ViewBuffer>,
264    ) -> Result<usize> {
265        match self {
266            ByteViewArrayDecoder::Plain(d) => d.read(out, len),
267            ByteViewArrayDecoder::Dictionary(d) => {
268                let dict = dict
269                    .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
270                d.read(out, dict, len)
271            }
272            ByteViewArrayDecoder::DeltaLength(d) => d.read(out, len),
273            ByteViewArrayDecoder::DeltaByteArray(d) => d.read(out, len),
274        }
275    }
276
277    /// Skip `len` values
278    pub fn skip(&mut self, len: usize, dict: Option<&ViewBuffer>) -> Result<usize> {
279        match self {
280            ByteViewArrayDecoder::Plain(d) => d.skip(len),
281            ByteViewArrayDecoder::Dictionary(d) => {
282                let dict = dict
283                    .ok_or_else(|| general_err!("dictionary required for dictionary encoding"))?;
284                d.skip(dict, len)
285            }
286            ByteViewArrayDecoder::DeltaLength(d) => d.skip(len),
287            ByteViewArrayDecoder::DeltaByteArray(d) => d.skip(len),
288        }
289    }
290}
291
292/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`]
293pub struct ByteViewArrayDecoderPlain {
294    buf: Buffer,
295    offset: usize,
296
297    validate_utf8: bool,
298
299    /// This is a maximum as the null count is not always known, e.g. value data from
300    /// a v1 data page
301    max_remaining_values: usize,
302}
303
304impl ByteViewArrayDecoderPlain {
305    pub fn new(
306        buf: Bytes,
307        num_levels: usize,
308        num_values: Option<usize>,
309        validate_utf8: bool,
310    ) -> Self {
311        Self {
312            buf: Buffer::from(buf),
313            offset: 0,
314            max_remaining_values: num_values.unwrap_or(num_levels),
315            validate_utf8,
316        }
317    }
318
319    pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
320        // avoid creating a new buffer if the last buffer is the same as the current buffer
321        // This is especially useful when row-level filtering is applied, where we call lots of small `read` over the same buffer.
322        let block_id = {
323            if output.buffers.last().is_some_and(|x| x.ptr_eq(&self.buf)) {
324                output.buffers.len() as u32 - 1
325            } else {
326                output.append_block(self.buf.clone())
327            }
328        };
329
330        let to_read = len.min(self.max_remaining_values);
331
332        let buf = self.buf.as_ref();
333        let mut read = 0;
334        output.views.reserve(to_read);
335
336        let mut utf8_validation_begin = self.offset;
337        while self.offset < self.buf.len() && read != to_read {
338            if self.offset + 4 > self.buf.len() {
339                return Err(ParquetError::EOF("eof decoding byte array".into()));
340            }
341            let len_bytes: [u8; 4] = unsafe {
342                buf.get_unchecked(self.offset..self.offset + 4)
343                    .try_into()
344                    .unwrap()
345            };
346            let len = u32::from_le_bytes(len_bytes);
347
348            let start_offset = self.offset + 4;
349            let end_offset = start_offset + len as usize;
350            if end_offset > buf.len() {
351                return Err(ParquetError::EOF("eof decoding byte array".into()));
352            }
353
354            if self.validate_utf8 {
355                // It seems you are trying to understand what's going on here, take a breath and be patient.
356                // Utf-8 validation is a non-trivial task, here are some background facts:
357                // (1) Validating one 2048-byte string is much faster than validating 128 of 16-byte string.
358                //     As shown in https://github.com/apache/arrow-rs/pull/6009#issuecomment-2211174229
359                //     Potentially because the SIMD operations favor longer strings.
360                // (2) Practical strings are short, 99% of strings are smaller than 100 bytes, as shown in paper:
361                //     https://www.vldb.org/pvldb/vol17/p148-zeng.pdf, Figure 5f.
362                // (3) Parquet plain encoding makes utf-8 validation harder,
363                //     because it stores the length of each string right before the string.
364                //     This means naive utf-8 validation will be slow, because the validation need to skip the length bytes.
365                //     I.e., the validation cannot validate the buffer in one pass, but instead, validate strings chunk by chunk.
366                //
367                // Given the above observations, the goal is to do batch validation as much as possible.
368                // The key idea is that if the length is smaller than 128 (99% of the case), then the length bytes are valid utf-8, as reasoned below:
369                // If the length is smaller than 128, its 4-byte encoding are [0, 0, 0, len].
370                // Each of the byte is a valid ASCII character, so they are valid utf-8.
371                // Since they are all smaller than 128, the won't break a utf-8 code point (won't mess with later bytes).
372                //
373                // The implementation keeps a water mark `utf8_validation_begin` to track the beginning of the buffer that is not validated.
374                // If the length is smaller than 128, then we continue to next string.
375                // If the length is larger than 128, then we validate the buffer before the length bytes, and move the water mark to the beginning of next string.
376                if len < 128 {
377                    // fast path, move to next string.
378                    // the len bytes are valid utf8.
379                } else {
380                    // unfortunately, the len bytes may not be valid utf8, we need to wrap up and validate everything before it.
381                    check_valid_utf8(unsafe {
382                        buf.get_unchecked(utf8_validation_begin..self.offset)
383                    })?;
384                    // move the cursor to skip the len bytes.
385                    utf8_validation_begin = start_offset;
386                }
387            }
388
389            unsafe {
390                output.append_view_unchecked(block_id, start_offset as u32, len);
391            }
392            self.offset = end_offset;
393            read += 1;
394        }
395
396        // validate the last part of the buffer
397        if self.validate_utf8 {
398            check_valid_utf8(unsafe { buf.get_unchecked(utf8_validation_begin..self.offset) })?;
399        }
400
401        self.max_remaining_values -= to_read;
402        Ok(to_read)
403    }
404
405    pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
406        let to_skip = to_skip.min(self.max_remaining_values);
407        let mut skip = 0;
408        let buf = self.buf.as_ref();
409
410        while self.offset < self.buf.len() && skip != to_skip {
411            if self.offset + 4 > buf.len() {
412                return Err(ParquetError::EOF("eof decoding byte array".into()));
413            }
414            let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
415            let len = u32::from_le_bytes(len_bytes) as usize;
416            skip += 1;
417            self.offset = self.offset + 4 + len;
418        }
419        self.max_remaining_values -= skip;
420        Ok(skip)
421    }
422}
423
424pub struct ByteViewArrayDecoderDictionary {
425    decoder: DictIndexDecoder,
426}
427
428impl ByteViewArrayDecoderDictionary {
429    fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
430        Self {
431            decoder: DictIndexDecoder::new(data, num_levels, num_values),
432        }
433    }
434
435    /// Reads the next indexes from self.decoder
436    /// the indexes are assumed to be indexes into `dict`
437    /// the output values are written to output
438    ///
439    /// Assumptions / Optimization
440    /// This function checks if dict.buffers() are the last buffers in `output`, and if so
441    /// reuses the dictionary page buffers directly without copying data
442    fn read(&mut self, output: &mut ViewBuffer, dict: &ViewBuffer, len: usize) -> Result<usize> {
443        if dict.is_empty() || len == 0 {
444            return Ok(0);
445        }
446
447        // Check if the last few buffer of `output`` are the same as the `dict` buffer
448        // This is to avoid creating a new buffers if the same dictionary is used for multiple `read`
449        let need_to_create_new_buffer = {
450            if output.buffers.len() >= dict.buffers.len() {
451                let offset = output.buffers.len() - dict.buffers.len();
452                output.buffers[offset..]
453                    .iter()
454                    .zip(dict.buffers.iter())
455                    .any(|(a, b)| !a.ptr_eq(b))
456            } else {
457                true
458            }
459        };
460
461        if need_to_create_new_buffer {
462            for b in dict.buffers.iter() {
463                output.buffers.push(b.clone());
464            }
465        }
466
467        // Calculate the offset of the dictionary buffers in the output buffers
468        // For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers,
469        // then the base_buffer_idx is 5 - 2 = 3
470        let base_buffer_idx = output.buffers.len() as u32 - dict.buffers.len() as u32;
471
472        self.decoder.read(len, |keys| {
473            for k in keys {
474                let view = dict
475                    .views
476                    .get(*k as usize)
477                    .ok_or_else(|| general_err!("invalid key={} for dictionary", *k))?;
478                let len = *view as u32;
479                if len <= 12 {
480                    // directly append the view if it is inlined
481                    // Safety: the view is from the dictionary, so it is valid
482                    unsafe {
483                        output.append_raw_view_unchecked(view);
484                    }
485                } else {
486                    // correct the buffer index and append the view
487                    let mut view = ByteView::from(*view);
488                    view.buffer_index += base_buffer_idx;
489                    // Safety: the view is from the dictionary,
490                    // we corrected the index value to point it to output buffer, so it is valid
491                    unsafe {
492                        output.append_raw_view_unchecked(&view.into());
493                    }
494                }
495            }
496            Ok(())
497        })
498    }
499
500    fn skip(&mut self, dict: &ViewBuffer, to_skip: usize) -> Result<usize> {
501        if dict.is_empty() {
502            return Ok(0);
503        }
504        self.decoder.skip(to_skip)
505    }
506}
507
508/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`ViewBuffer`]
509pub struct ByteViewArrayDecoderDeltaLength {
510    lengths: Vec<i32>,
511    data: Bytes,
512    length_offset: usize,
513    data_offset: usize,
514    validate_utf8: bool,
515}
516
517impl ByteViewArrayDecoderDeltaLength {
518    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
519        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
520        len_decoder.set_data(data.clone(), 0)?;
521        let values = len_decoder.values_left();
522
523        let mut lengths = vec![0; values];
524        len_decoder.get(&mut lengths)?;
525
526        let mut total_bytes = 0;
527
528        for l in lengths.iter() {
529            if *l < 0 {
530                return Err(ParquetError::General(
531                    "negative delta length byte array length".to_string(),
532                ));
533            }
534            total_bytes += *l as usize;
535        }
536
537        if total_bytes + len_decoder.get_offset() > data.len() {
538            return Err(ParquetError::General(
539                "Insufficient delta length byte array bytes".to_string(),
540            ));
541        }
542
543        Ok(Self {
544            lengths,
545            data,
546            validate_utf8,
547            length_offset: 0,
548            data_offset: len_decoder.get_offset(),
549        })
550    }
551
552    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
553        let to_read = len.min(self.lengths.len() - self.length_offset);
554        output.views.reserve(to_read);
555
556        let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];
557
558        // Zero copy convert `bytes::Bytes` into `arrow_buffer::Buffer`
559        let bytes = Buffer::from(self.data.clone());
560        let block_id = output.append_block(bytes);
561
562        let mut current_offset = self.data_offset;
563        let initial_offset = current_offset;
564        for length in src_lengths {
565            // # Safety
566            // The length is from the delta length decoder, so it is valid
567            // The start_offset is calculated from the lengths, so it is valid
568            // `start_offset + length` is guaranteed to be within the bounds of `data`, as checked in `new`
569            unsafe { output.append_view_unchecked(block_id, current_offset as u32, *length as u32) }
570
571            current_offset += *length as usize;
572        }
573
574        // Delta length encoding has continuous strings, we can validate utf8 in one go
575        if self.validate_utf8 {
576            check_valid_utf8(&self.data[initial_offset..current_offset])?;
577        }
578
579        self.data_offset = current_offset;
580        self.length_offset += to_read;
581
582        Ok(to_read)
583    }
584
585    fn skip(&mut self, to_skip: usize) -> Result<usize> {
586        let remain_values = self.lengths.len() - self.length_offset;
587        let to_skip = remain_values.min(to_skip);
588
589        let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip];
590        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
591
592        self.data_offset += total_bytes;
593        self.length_offset += to_skip;
594        Ok(to_skip)
595    }
596}
597
598/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`ViewBuffer`]
599pub struct ByteViewArrayDecoderDelta {
600    decoder: DeltaByteArrayDecoder,
601    validate_utf8: bool,
602}
603
604impl ByteViewArrayDecoderDelta {
605    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
606        Ok(Self {
607            decoder: DeltaByteArrayDecoder::new(data)?,
608            validate_utf8,
609        })
610    }
611
612    // Unlike other encodings, we need to copy the data.
613    //
614    //  DeltaByteArray data is stored using shared prefixes/suffixes,
615    // which results in potentially non-contiguous
616    // strings, while Arrow encodings require contiguous strings
617    //
618    // <https://parquet.apache.org/docs/file-format/data-pages/encodings/#delta-strings-delta_byte_array--7>
619
620    fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
621        output.views.reserve(len.min(self.decoder.remaining()));
622
623        // array buffer only have long strings
624        let mut array_buffer: Vec<u8> = Vec::with_capacity(4096);
625
626        let buffer_id = output.buffers.len() as u32;
627
628        let read = if !self.validate_utf8 {
629            self.decoder.read(len, |bytes| {
630                let offset = array_buffer.len();
631                let view = make_view(bytes, buffer_id, offset as u32);
632                if bytes.len() > 12 {
633                    // only copy the data to buffer if the string can not be inlined.
634                    array_buffer.extend_from_slice(bytes);
635                }
636
637                // # Safety
638                // The buffer_id is the last buffer in the output buffers
639                // The offset is calculated from the buffer, so it is valid
640                unsafe {
641                    output.append_raw_view_unchecked(&view);
642                }
643                Ok(())
644            })?
645        } else {
646            // utf8 validation buffer has only short strings. These short
647            // strings are inlined into the views but we copy them into a
648            // contiguous buffer to accelerate validation.®
649            let mut utf8_validation_buffer = Vec::with_capacity(4096);
650
651            let v = self.decoder.read(len, |bytes| {
652                let offset = array_buffer.len();
653                let view = make_view(bytes, buffer_id, offset as u32);
654                if bytes.len() > 12 {
655                    // only copy the data to buffer if the string can not be inlined.
656                    array_buffer.extend_from_slice(bytes);
657                } else {
658                    utf8_validation_buffer.extend_from_slice(bytes);
659                }
660
661                // # Safety
662                // The buffer_id is the last buffer in the output buffers
663                // The offset is calculated from the buffer, so it is valid
664                // Utf-8 validation is done later
665                unsafe {
666                    output.append_raw_view_unchecked(&view);
667                }
668                Ok(())
669            })?;
670            check_valid_utf8(&array_buffer)?;
671            check_valid_utf8(&utf8_validation_buffer)?;
672            v
673        };
674
675        let actual_block_id = output.append_block(Buffer::from_vec(array_buffer));
676        assert_eq!(actual_block_id, buffer_id);
677        Ok(read)
678    }
679
680    fn skip(&mut self, to_skip: usize) -> Result<usize> {
681        self.decoder.skip(to_skip)
682    }
683}
684
685#[cfg(test)]
686mod tests {
687    use arrow_array::StringViewArray;
688    use arrow_buffer::Buffer;
689
690    use crate::{
691        arrow::{
692            array_reader::test_util::{byte_array_all_encodings, encode_byte_array, utf8_column},
693            buffer::view_buffer::ViewBuffer,
694            record_reader::buffer::ValuesBuffer,
695        },
696        basic::Encoding,
697        column::reader::decoder::ColumnValueDecoder,
698        data_type::ByteArray,
699    };
700
701    use super::*;
702
703    #[test]
704    fn test_byte_array_string_view_decoder() {
705        let (pages, encoded_dictionary) =
706            byte_array_all_encodings(vec!["hello", "world", "large payload over 12 bytes", "b"]);
707
708        let column_desc = utf8_column();
709        let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
710
711        decoder
712            .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
713            .unwrap();
714
715        for (encoding, page) in pages {
716            let mut output = ViewBuffer::default();
717            decoder.set_data(encoding, page, 4, Some(4)).unwrap();
718
719            assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
720            assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
721            assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
722            assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
723
724            assert_eq!(output.views.len(), 4);
725
726            let valid = [false, false, true, true, false, true, true, false, false];
727            let valid_buffer = Buffer::from_iter(valid.iter().cloned());
728
729            output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
730            let array = output.into_array(Some(valid_buffer), &ArrowType::Utf8View);
731            let strings = array.as_any().downcast_ref::<StringViewArray>().unwrap();
732
733            assert_eq!(
734                strings.iter().collect::<Vec<_>>(),
735                vec![
736                    None,
737                    None,
738                    Some("hello"),
739                    Some("world"),
740                    None,
741                    Some("large payload over 12 bytes"),
742                    Some("b"),
743                    None,
744                    None,
745                ]
746            );
747        }
748    }
749
750    #[test]
751    fn test_byte_view_array_plain_decoder_reuse_buffer() {
752        let byte_array = vec!["hello", "world", "large payload over 12 bytes", "b"];
753        let byte_array: Vec<ByteArray> = byte_array.into_iter().map(|x| x.into()).collect();
754        let pages = encode_byte_array(Encoding::PLAIN, &byte_array);
755
756        let column_desc = utf8_column();
757        let mut decoder = ByteViewArrayColumnValueDecoder::new(&column_desc);
758
759        let mut view_buffer = ViewBuffer::default();
760        decoder.set_data(Encoding::PLAIN, pages, 4, None).unwrap();
761        decoder.read(&mut view_buffer, 1).unwrap();
762        decoder.read(&mut view_buffer, 1).unwrap();
763        assert_eq!(view_buffer.buffers.len(), 1);
764
765        decoder.read(&mut view_buffer, 1).unwrap();
766        assert_eq!(view_buffer.buffers.len(), 1);
767    }
768}