parquet/arrow/array_reader/
byte_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::buffer::bit_util::sign_extend_be;
20use crate::arrow::buffer::offset_buffer::OffsetBuffer;
21use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
22use crate::arrow::record_reader::GenericRecordReader;
23use crate::arrow::schema::parquet_to_arrow_field;
24use crate::basic::{ConvertedType, Encoding};
25use crate::column::page::PageIterator;
26use crate::column::reader::decoder::ColumnValueDecoder;
27use crate::data_type::Int32Type;
28use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
29use crate::errors::{ParquetError, Result};
30use crate::schema::types::ColumnDescPtr;
31use arrow_array::{
32    Array, ArrayRef, BinaryArray, Decimal128Array, Decimal256Array, OffsetSizeTrait,
33};
34use arrow_buffer::i256;
35use arrow_schema::DataType as ArrowType;
36use bytes::Bytes;
37use std::any::Any;
38use std::sync::Arc;
39
40/// Returns an [`ArrayReader`] that decodes the provided byte array column
41pub fn make_byte_array_reader(
42    pages: Box<dyn PageIterator>,
43    column_desc: ColumnDescPtr,
44    arrow_type: Option<ArrowType>,
45) -> Result<Box<dyn ArrayReader>> {
46    // Check if Arrow type is specified, else create it from Parquet type
47    let data_type = match arrow_type {
48        Some(t) => t,
49        None => parquet_to_arrow_field(column_desc.as_ref())?
50            .data_type()
51            .clone(),
52    };
53
54    match data_type {
55        ArrowType::Binary
56        | ArrowType::Utf8
57        | ArrowType::Decimal128(_, _)
58        | ArrowType::Decimal256(_, _) => {
59            let reader = GenericRecordReader::new(column_desc);
60            Ok(Box::new(ByteArrayReader::<i32>::new(
61                pages, data_type, reader,
62            )))
63        }
64        ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
65            let reader = GenericRecordReader::new(column_desc);
66            Ok(Box::new(ByteArrayReader::<i64>::new(
67                pages, data_type, reader,
68            )))
69        }
70        _ => Err(general_err!(
71            "invalid data type for byte array reader - {}",
72            data_type
73        )),
74    }
75}
76
77/// An [`ArrayReader`] for variable length byte arrays
78struct ByteArrayReader<I: OffsetSizeTrait> {
79    data_type: ArrowType,
80    pages: Box<dyn PageIterator>,
81    def_levels_buffer: Option<Vec<i16>>,
82    rep_levels_buffer: Option<Vec<i16>>,
83    record_reader: GenericRecordReader<OffsetBuffer<I>, ByteArrayColumnValueDecoder<I>>,
84}
85
86impl<I: OffsetSizeTrait> ByteArrayReader<I> {
87    fn new(
88        pages: Box<dyn PageIterator>,
89        data_type: ArrowType,
90        record_reader: GenericRecordReader<OffsetBuffer<I>, ByteArrayColumnValueDecoder<I>>,
91    ) -> Self {
92        Self {
93            data_type,
94            pages,
95            def_levels_buffer: None,
96            rep_levels_buffer: None,
97            record_reader,
98        }
99    }
100}
101
102impl<I: OffsetSizeTrait> ArrayReader for ByteArrayReader<I> {
103    fn as_any(&self) -> &dyn Any {
104        self
105    }
106
107    fn get_data_type(&self) -> &ArrowType {
108        &self.data_type
109    }
110
111    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
112        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
113    }
114
115    fn consume_batch(&mut self) -> Result<ArrayRef> {
116        let buffer = self.record_reader.consume_record_data();
117        let null_buffer = self.record_reader.consume_bitmap_buffer();
118        self.def_levels_buffer = self.record_reader.consume_def_levels();
119        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
120        self.record_reader.reset();
121
122        let array: ArrayRef = match self.data_type {
123            // Apply conversion to all elements regardless of null slots as the conversions
124            // are infallible. This improves performance by avoiding a branch in the inner
125            // loop (see docs for `PrimitiveArray::from_unary`).
126            ArrowType::Decimal128(p, s) => {
127                let array = buffer.into_array(null_buffer, ArrowType::Binary);
128                let binary = array.as_any().downcast_ref::<BinaryArray>().unwrap();
129                // Null slots will have 0 length, so we need to check for that in the lambda
130                // or sign_extend_be will panic.
131                let decimal = Decimal128Array::from_unary(binary, |x| match x.len() {
132                    0 => i128::default(),
133                    _ => i128::from_be_bytes(sign_extend_be(x)),
134                })
135                .with_precision_and_scale(p, s)?;
136                Arc::new(decimal)
137            }
138            ArrowType::Decimal256(p, s) => {
139                let array = buffer.into_array(null_buffer, ArrowType::Binary);
140                let binary = array.as_any().downcast_ref::<BinaryArray>().unwrap();
141                // Null slots will have 0 length, so we need to check for that in the lambda
142                // or sign_extend_be will panic.
143                let decimal = Decimal256Array::from_unary(binary, |x| match x.len() {
144                    0 => i256::default(),
145                    _ => i256::from_be_bytes(sign_extend_be(x)),
146                })
147                .with_precision_and_scale(p, s)?;
148                Arc::new(decimal)
149            }
150            _ => buffer.into_array(null_buffer, self.data_type.clone()),
151        };
152
153        Ok(array)
154    }
155
156    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
157        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
158    }
159
160    fn get_def_levels(&self) -> Option<&[i16]> {
161        self.def_levels_buffer.as_deref()
162    }
163
164    fn get_rep_levels(&self) -> Option<&[i16]> {
165        self.rep_levels_buffer.as_deref()
166    }
167}
168
169/// A [`ColumnValueDecoder`] for variable length byte arrays
170struct ByteArrayColumnValueDecoder<I: OffsetSizeTrait> {
171    dict: Option<OffsetBuffer<I>>,
172    decoder: Option<ByteArrayDecoder>,
173    validate_utf8: bool,
174}
175
176impl<I: OffsetSizeTrait> ColumnValueDecoder for ByteArrayColumnValueDecoder<I> {
177    type Buffer = OffsetBuffer<I>;
178
179    fn new(desc: &ColumnDescPtr) -> Self {
180        let validate_utf8 = desc.converted_type() == ConvertedType::UTF8;
181        Self {
182            dict: None,
183            decoder: None,
184            validate_utf8,
185        }
186    }
187
188    fn set_dict(
189        &mut self,
190        buf: Bytes,
191        num_values: u32,
192        encoding: Encoding,
193        _is_sorted: bool,
194    ) -> Result<()> {
195        if !matches!(
196            encoding,
197            Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
198        ) {
199            return Err(nyi_err!(
200                "Invalid/Unsupported encoding type for dictionary: {}",
201                encoding
202            ));
203        }
204
205        let mut buffer = OffsetBuffer::default();
206        let mut decoder = ByteArrayDecoderPlain::new(
207            buf,
208            num_values as usize,
209            Some(num_values as usize),
210            self.validate_utf8,
211        );
212        decoder.read(&mut buffer, usize::MAX)?;
213        self.dict = Some(buffer);
214        Ok(())
215    }
216
217    fn set_data(
218        &mut self,
219        encoding: Encoding,
220        data: Bytes,
221        num_levels: usize,
222        num_values: Option<usize>,
223    ) -> Result<()> {
224        self.decoder = Some(ByteArrayDecoder::new(
225            encoding,
226            data,
227            num_levels,
228            num_values,
229            self.validate_utf8,
230        )?);
231        Ok(())
232    }
233
234    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
235        let decoder = self
236            .decoder
237            .as_mut()
238            .ok_or_else(|| general_err!("no decoder set"))?;
239
240        decoder.read(out, num_values, self.dict.as_ref())
241    }
242
243    fn skip_values(&mut self, num_values: usize) -> Result<usize> {
244        let decoder = self
245            .decoder
246            .as_mut()
247            .ok_or_else(|| general_err!("no decoder set"))?;
248
249        decoder.skip(num_values, self.dict.as_ref())
250    }
251}
252
253/// A generic decoder from uncompressed parquet value data to [`OffsetBuffer`]
254pub enum ByteArrayDecoder {
255    Plain(ByteArrayDecoderPlain),
256    Dictionary(ByteArrayDecoderDictionary),
257    DeltaLength(ByteArrayDecoderDeltaLength),
258    DeltaByteArray(ByteArrayDecoderDelta),
259}
260
261impl ByteArrayDecoder {
262    pub fn new(
263        encoding: Encoding,
264        data: Bytes,
265        num_levels: usize,
266        num_values: Option<usize>,
267        validate_utf8: bool,
268    ) -> Result<Self> {
269        let decoder = match encoding {
270            Encoding::PLAIN => ByteArrayDecoder::Plain(ByteArrayDecoderPlain::new(
271                data,
272                num_levels,
273                num_values,
274                validate_utf8,
275            )),
276            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => ByteArrayDecoder::Dictionary(
277                ByteArrayDecoderDictionary::new(data, num_levels, num_values),
278            ),
279            Encoding::DELTA_LENGTH_BYTE_ARRAY => ByteArrayDecoder::DeltaLength(
280                ByteArrayDecoderDeltaLength::new(data, validate_utf8)?,
281            ),
282            Encoding::DELTA_BYTE_ARRAY => {
283                ByteArrayDecoder::DeltaByteArray(ByteArrayDecoderDelta::new(data, validate_utf8)?)
284            }
285            _ => {
286                return Err(general_err!(
287                    "unsupported encoding for byte array: {}",
288                    encoding
289                ))
290            }
291        };
292
293        Ok(decoder)
294    }
295
296    /// Read up to `len` values to `out` with the optional dictionary
297    pub fn read<I: OffsetSizeTrait>(
298        &mut self,
299        out: &mut OffsetBuffer<I>,
300        len: usize,
301        dict: Option<&OffsetBuffer<I>>,
302    ) -> Result<usize> {
303        match self {
304            ByteArrayDecoder::Plain(d) => d.read(out, len),
305            ByteArrayDecoder::Dictionary(d) => {
306                let dict =
307                    dict.ok_or_else(|| general_err!("missing dictionary page for column"))?;
308
309                d.read(out, dict, len)
310            }
311            ByteArrayDecoder::DeltaLength(d) => d.read(out, len),
312            ByteArrayDecoder::DeltaByteArray(d) => d.read(out, len),
313        }
314    }
315
316    /// Skip `len` values
317    pub fn skip<I: OffsetSizeTrait>(
318        &mut self,
319        len: usize,
320        dict: Option<&OffsetBuffer<I>>,
321    ) -> Result<usize> {
322        match self {
323            ByteArrayDecoder::Plain(d) => d.skip(len),
324            ByteArrayDecoder::Dictionary(d) => {
325                let dict =
326                    dict.ok_or_else(|| general_err!("missing dictionary page for column"))?;
327
328                d.skip(dict, len)
329            }
330            ByteArrayDecoder::DeltaLength(d) => d.skip(len),
331            ByteArrayDecoder::DeltaByteArray(d) => d.skip(len),
332        }
333    }
334}
335
336/// Decoder from [`Encoding::PLAIN`] data to [`OffsetBuffer`]
337pub struct ByteArrayDecoderPlain {
338    buf: Bytes,
339    offset: usize,
340    validate_utf8: bool,
341
342    /// This is a maximum as the null count is not always known, e.g. value data from
343    /// a v1 data page
344    max_remaining_values: usize,
345}
346
347impl ByteArrayDecoderPlain {
348    pub fn new(
349        buf: Bytes,
350        num_levels: usize,
351        num_values: Option<usize>,
352        validate_utf8: bool,
353    ) -> Self {
354        Self {
355            buf,
356            validate_utf8,
357            offset: 0,
358            max_remaining_values: num_values.unwrap_or(num_levels),
359        }
360    }
361
362    pub fn read<I: OffsetSizeTrait>(
363        &mut self,
364        output: &mut OffsetBuffer<I>,
365        len: usize,
366    ) -> Result<usize> {
367        let initial_values_length = output.values.len();
368
369        let to_read = len.min(self.max_remaining_values);
370        output.offsets.reserve(to_read);
371
372        let remaining_bytes = self.buf.len() - self.offset;
373        if remaining_bytes == 0 {
374            return Ok(0);
375        }
376
377        let estimated_bytes = remaining_bytes
378            .checked_mul(to_read)
379            .map(|x| x / self.max_remaining_values)
380            .unwrap_or_default();
381
382        output.values.reserve(estimated_bytes);
383
384        let mut read = 0;
385
386        let buf = self.buf.as_ref();
387        while self.offset < self.buf.len() && read != to_read {
388            if self.offset + 4 > buf.len() {
389                return Err(ParquetError::EOF("eof decoding byte array".into()));
390            }
391            let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
392            let len = u32::from_le_bytes(len_bytes);
393
394            let start_offset = self.offset + 4;
395            let end_offset = start_offset + len as usize;
396            if end_offset > buf.len() {
397                return Err(ParquetError::EOF("eof decoding byte array".into()));
398            }
399
400            output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?;
401
402            self.offset = end_offset;
403            read += 1;
404        }
405        self.max_remaining_values -= to_read;
406
407        if self.validate_utf8 {
408            output.check_valid_utf8(initial_values_length)?;
409        }
410        Ok(to_read)
411    }
412
413    pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
414        let to_skip = to_skip.min(self.max_remaining_values);
415        let mut skip = 0;
416        let buf = self.buf.as_ref();
417
418        while self.offset < self.buf.len() && skip != to_skip {
419            if self.offset + 4 > buf.len() {
420                return Err(ParquetError::EOF("eof decoding byte array".into()));
421            }
422            let len_bytes: [u8; 4] = buf[self.offset..self.offset + 4].try_into().unwrap();
423            let len = u32::from_le_bytes(len_bytes) as usize;
424            skip += 1;
425            self.offset = self.offset + 4 + len;
426        }
427        self.max_remaining_values -= skip;
428        Ok(skip)
429    }
430}
431
432/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`OffsetBuffer`]
433pub struct ByteArrayDecoderDeltaLength {
434    lengths: Vec<i32>,
435    data: Bytes,
436    length_offset: usize,
437    data_offset: usize,
438    validate_utf8: bool,
439}
440
441impl ByteArrayDecoderDeltaLength {
442    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
443        let mut len_decoder = DeltaBitPackDecoder::<Int32Type>::new();
444        len_decoder.set_data(data.clone(), 0)?;
445        let values = len_decoder.values_left();
446
447        let mut lengths = vec![0; values];
448        len_decoder.get(&mut lengths)?;
449
450        let mut total_bytes = 0;
451
452        for l in lengths.iter() {
453            if *l < 0 {
454                return Err(ParquetError::General(
455                    "negative delta length byte array length".to_string(),
456                ));
457            }
458            total_bytes += *l as usize;
459        }
460
461        if total_bytes + len_decoder.get_offset() > data.len() {
462            return Err(ParquetError::General(
463                "Insufficient delta length byte array bytes".to_string(),
464            ));
465        }
466
467        Ok(Self {
468            lengths,
469            data,
470            validate_utf8,
471            length_offset: 0,
472            data_offset: len_decoder.get_offset(),
473        })
474    }
475
476    fn read<I: OffsetSizeTrait>(
477        &mut self,
478        output: &mut OffsetBuffer<I>,
479        len: usize,
480    ) -> Result<usize> {
481        let initial_values_length = output.values.len();
482
483        let to_read = len.min(self.lengths.len() - self.length_offset);
484        output.offsets.reserve(to_read);
485
486        let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_read];
487
488        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
489        output.values.reserve(total_bytes);
490
491        let mut current_offset = self.data_offset;
492        for length in src_lengths {
493            let end_offset = current_offset + *length as usize;
494            output.try_push(
495                &self.data.as_ref()[current_offset..end_offset],
496                self.validate_utf8,
497            )?;
498            current_offset = end_offset;
499        }
500
501        self.data_offset = current_offset;
502        self.length_offset += to_read;
503
504        if self.validate_utf8 {
505            output.check_valid_utf8(initial_values_length)?;
506        }
507        Ok(to_read)
508    }
509
510    fn skip(&mut self, to_skip: usize) -> Result<usize> {
511        let remain_values = self.lengths.len() - self.length_offset;
512        let to_skip = remain_values.min(to_skip);
513
514        let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip];
515        let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();
516
517        self.data_offset += total_bytes;
518        self.length_offset += to_skip;
519        Ok(to_skip)
520    }
521}
522
523/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`]
524pub struct ByteArrayDecoderDelta {
525    decoder: DeltaByteArrayDecoder,
526    validate_utf8: bool,
527}
528
529impl ByteArrayDecoderDelta {
530    fn new(data: Bytes, validate_utf8: bool) -> Result<Self> {
531        Ok(Self {
532            decoder: DeltaByteArrayDecoder::new(data)?,
533            validate_utf8,
534        })
535    }
536
537    fn read<I: OffsetSizeTrait>(
538        &mut self,
539        output: &mut OffsetBuffer<I>,
540        len: usize,
541    ) -> Result<usize> {
542        let initial_values_length = output.values.len();
543        output.offsets.reserve(len.min(self.decoder.remaining()));
544
545        let read = self
546            .decoder
547            .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?;
548
549        if self.validate_utf8 {
550            output.check_valid_utf8(initial_values_length)?;
551        }
552        Ok(read)
553    }
554
555    fn skip(&mut self, to_skip: usize) -> Result<usize> {
556        self.decoder.skip(to_skip)
557    }
558}
559
560/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`]
561pub struct ByteArrayDecoderDictionary {
562    decoder: DictIndexDecoder,
563}
564
565impl ByteArrayDecoderDictionary {
566    fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Self {
567        Self {
568            decoder: DictIndexDecoder::new(data, num_levels, num_values),
569        }
570    }
571
572    fn read<I: OffsetSizeTrait>(
573        &mut self,
574        output: &mut OffsetBuffer<I>,
575        dict: &OffsetBuffer<I>,
576        len: usize,
577    ) -> Result<usize> {
578        // All data must be NULL
579        if dict.is_empty() {
580            return Ok(0);
581        }
582
583        self.decoder.read(len, |keys| {
584            output.extend_from_dictionary(keys, dict.offsets.as_slice(), dict.values.as_slice())
585        })
586    }
587
588    fn skip<I: OffsetSizeTrait>(
589        &mut self,
590        dict: &OffsetBuffer<I>,
591        to_skip: usize,
592    ) -> Result<usize> {
593        // All data must be NULL
594        if dict.is_empty() {
595            return Ok(0);
596        }
597
598        self.decoder.skip(to_skip)
599    }
600}
601
602#[cfg(test)]
603mod tests {
604    use super::*;
605    use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column};
606    use crate::arrow::record_reader::buffer::ValuesBuffer;
607    use arrow_array::{Array, StringArray};
608    use arrow_buffer::Buffer;
609
610    #[test]
611    fn test_byte_array_decoder() {
612        let (pages, encoded_dictionary) =
613            byte_array_all_encodings(vec!["hello", "world", "a", "b"]);
614
615        let column_desc = utf8_column();
616        let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
617
618        decoder
619            .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
620            .unwrap();
621
622        for (encoding, page) in pages {
623            let mut output = OffsetBuffer::<i32>::default();
624            decoder.set_data(encoding, page, 4, Some(4)).unwrap();
625
626            assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
627
628            assert_eq!(output.values.as_slice(), "hello".as_bytes());
629            assert_eq!(output.offsets.as_slice(), &[0, 5]);
630
631            assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
632            assert_eq!(output.values.as_slice(), "helloworld".as_bytes());
633            assert_eq!(output.offsets.as_slice(), &[0, 5, 10]);
634
635            assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
636            assert_eq!(output.values.as_slice(), "helloworldab".as_bytes());
637            assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 11, 12]);
638
639            assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
640
641            let valid = [false, false, true, true, false, true, true, false, false];
642            let valid_buffer = Buffer::from_iter(valid.iter().cloned());
643
644            output.pad_nulls(0, 4, valid.len(), valid_buffer.as_slice());
645            let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
646            let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
647
648            assert_eq!(
649                strings.iter().collect::<Vec<_>>(),
650                vec![
651                    None,
652                    None,
653                    Some("hello"),
654                    Some("world"),
655                    None,
656                    Some("a"),
657                    Some("b"),
658                    None,
659                    None,
660                ]
661            );
662        }
663    }
664
665    #[test]
666    fn test_byte_array_decoder_skip() {
667        let (pages, encoded_dictionary) =
668            byte_array_all_encodings(vec!["hello", "world", "a", "b"]);
669
670        let column_desc = utf8_column();
671        let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
672
673        decoder
674            .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
675            .unwrap();
676
677        for (encoding, page) in pages {
678            let mut output = OffsetBuffer::<i32>::default();
679            decoder.set_data(encoding, page, 4, Some(4)).unwrap();
680
681            assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
682
683            assert_eq!(output.values.as_slice(), "hello".as_bytes());
684            assert_eq!(output.offsets.as_slice(), &[0, 5]);
685
686            assert_eq!(decoder.skip_values(1).unwrap(), 1);
687            assert_eq!(decoder.skip_values(1).unwrap(), 1);
688
689            assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
690            assert_eq!(output.values.as_slice(), "hellob".as_bytes());
691            assert_eq!(output.offsets.as_slice(), &[0, 5, 6]);
692
693            assert_eq!(decoder.read(&mut output, 4).unwrap(), 0);
694
695            let valid = [false, false, true, true, false, false];
696            let valid_buffer = Buffer::from_iter(valid.iter().cloned());
697
698            output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice());
699            let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
700            let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
701
702            assert_eq!(
703                strings.iter().collect::<Vec<_>>(),
704                vec![None, None, Some("hello"), Some("b"), None, None,]
705            );
706        }
707    }
708
709    #[test]
710    fn test_byte_array_decoder_nulls() {
711        let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new());
712
713        let column_desc = utf8_column();
714        let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);
715
716        decoder
717            .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
718            .unwrap();
719
720        // test nulls read
721        for (encoding, page) in pages.clone() {
722            let mut output = OffsetBuffer::<i32>::default();
723            decoder.set_data(encoding, page, 4, None).unwrap();
724            assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
725        }
726
727        // test nulls skip
728        for (encoding, page) in pages {
729            decoder.set_data(encoding, page, 4, None).unwrap();
730            assert_eq!(decoder.skip_values(1024).unwrap(), 0);
731        }
732    }
733}