parquet/arrow/array_reader/
fixed_len_byte_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
19use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be};
20use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
21use crate::arrow::record_reader::buffer::ValuesBuffer;
22use crate::arrow::record_reader::GenericRecordReader;
23use crate::arrow::schema::parquet_to_arrow_field;
24use crate::basic::{Encoding, Type};
25use crate::column::page::PageIterator;
26use crate::column::reader::decoder::ColumnValueDecoder;
27use crate::errors::{ParquetError, Result};
28use crate::schema::types::ColumnDescPtr;
29use arrow_array::{
30    ArrayRef, Decimal128Array, Decimal256Array, FixedSizeBinaryArray, Float16Array,
31    IntervalDayTimeArray, IntervalYearMonthArray,
32};
33use arrow_buffer::{i256, Buffer, IntervalDayTime};
34use arrow_data::ArrayDataBuilder;
35use arrow_schema::{DataType as ArrowType, IntervalUnit};
36use bytes::Bytes;
37use half::f16;
38use std::any::Any;
39use std::ops::Range;
40use std::sync::Arc;
41
42/// Returns an [`ArrayReader`] that decodes the provided fixed length byte array column
43pub fn make_fixed_len_byte_array_reader(
44    pages: Box<dyn PageIterator>,
45    column_desc: ColumnDescPtr,
46    arrow_type: Option<ArrowType>,
47) -> Result<Box<dyn ArrayReader>> {
48    // Check if Arrow type is specified, else create it from Parquet type
49    let data_type = match arrow_type {
50        Some(t) => t,
51        None => parquet_to_arrow_field(column_desc.as_ref())?
52            .data_type()
53            .clone(),
54    };
55
56    let byte_length = match column_desc.physical_type() {
57        Type::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize,
58        t => {
59            return Err(general_err!(
60                "invalid physical type for fixed length byte array reader - {}",
61                t
62            ))
63        }
64    };
65    match &data_type {
66        ArrowType::FixedSizeBinary(_) => {}
67        ArrowType::Decimal128(_, _) => {
68            if byte_length > 16 {
69                return Err(general_err!(
70                    "decimal 128 type too large, must be less than 16 bytes, got {}",
71                    byte_length
72                ));
73            }
74        }
75        ArrowType::Decimal256(_, _) => {
76            if byte_length > 32 {
77                return Err(general_err!(
78                    "decimal 256 type too large, must be less than 32 bytes, got {}",
79                    byte_length
80                ));
81            }
82        }
83        ArrowType::Interval(_) => {
84            if byte_length != 12 {
85                // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#interval
86                return Err(general_err!(
87                    "interval type must consist of 12 bytes got {}",
88                    byte_length
89                ));
90            }
91        }
92        ArrowType::Float16 => {
93            if byte_length != 2 {
94                return Err(general_err!(
95                    "float 16 type must be 2 bytes, got {}",
96                    byte_length
97                ));
98            }
99        }
100        _ => {
101            return Err(general_err!(
102                "invalid data type for fixed length byte array reader - {}",
103                data_type
104            ))
105        }
106    }
107
108    Ok(Box::new(FixedLenByteArrayReader::new(
109        pages,
110        column_desc,
111        data_type,
112        byte_length,
113    )))
114}
115
116struct FixedLenByteArrayReader {
117    data_type: ArrowType,
118    byte_length: usize,
119    pages: Box<dyn PageIterator>,
120    def_levels_buffer: Option<Vec<i16>>,
121    rep_levels_buffer: Option<Vec<i16>>,
122    record_reader: GenericRecordReader<FixedLenByteArrayBuffer, ValueDecoder>,
123}
124
125impl FixedLenByteArrayReader {
126    fn new(
127        pages: Box<dyn PageIterator>,
128        column_desc: ColumnDescPtr,
129        data_type: ArrowType,
130        byte_length: usize,
131    ) -> Self {
132        Self {
133            data_type,
134            byte_length,
135            pages,
136            def_levels_buffer: None,
137            rep_levels_buffer: None,
138            record_reader: GenericRecordReader::new(column_desc),
139        }
140    }
141}
142
143impl ArrayReader for FixedLenByteArrayReader {
144    fn as_any(&self) -> &dyn Any {
145        self
146    }
147
148    fn get_data_type(&self) -> &ArrowType {
149        &self.data_type
150    }
151
152    fn read_records(&mut self, batch_size: usize) -> Result<usize> {
153        read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
154    }
155
156    fn consume_batch(&mut self) -> Result<ArrayRef> {
157        let record_data = self.record_reader.consume_record_data();
158
159        let array_data = ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32))
160            .len(self.record_reader.num_values())
161            .add_buffer(Buffer::from_vec(record_data.buffer))
162            .null_bit_buffer(self.record_reader.consume_bitmap_buffer());
163
164        let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() });
165
166        // TODO: An improvement might be to do this conversion on read
167        // Note the conversions below apply to all elements regardless of null slots as the
168        // conversion lambdas are all infallible. This improves performance by avoiding a branch in
169        // the inner loop (see docs for `PrimitiveArray::from_unary`).
170        let array: ArrayRef = match &self.data_type {
171            ArrowType::Decimal128(p, s) => {
172                let f = |b: &[u8]| i128::from_be_bytes(sign_extend_be(b));
173                Arc::new(Decimal128Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
174                    as ArrayRef
175            }
176            ArrowType::Decimal256(p, s) => {
177                let f = |b: &[u8]| i256::from_be_bytes(sign_extend_be(b));
178                Arc::new(Decimal256Array::from_unary(&binary, f).with_precision_and_scale(*p, *s)?)
179                    as ArrayRef
180            }
181            ArrowType::Interval(unit) => {
182                // An interval is stored as 3x 32-bit unsigned integers storing months, days,
183                // and milliseconds
184                match unit {
185                    IntervalUnit::YearMonth => {
186                        let f = |b: &[u8]| i32::from_le_bytes(b[0..4].try_into().unwrap());
187                        Arc::new(IntervalYearMonthArray::from_unary(&binary, f)) as ArrayRef
188                    }
189                    IntervalUnit::DayTime => {
190                        let f = |b: &[u8]| {
191                            IntervalDayTime::new(
192                                i32::from_le_bytes(b[4..8].try_into().unwrap()),
193                                i32::from_le_bytes(b[8..12].try_into().unwrap()),
194                            )
195                        };
196                        Arc::new(IntervalDayTimeArray::from_unary(&binary, f)) as ArrayRef
197                    }
198                    IntervalUnit::MonthDayNano => {
199                        return Err(nyi_err!("MonthDayNano intervals not supported"));
200                    }
201                }
202            }
203            ArrowType::Float16 => {
204                let f = |b: &[u8]| f16::from_le_bytes(b[..2].try_into().unwrap());
205                Arc::new(Float16Array::from_unary(&binary, f)) as ArrayRef
206            }
207            _ => Arc::new(binary) as ArrayRef,
208        };
209
210        self.def_levels_buffer = self.record_reader.consume_def_levels();
211        self.rep_levels_buffer = self.record_reader.consume_rep_levels();
212        self.record_reader.reset();
213
214        Ok(array)
215    }
216
217    fn skip_records(&mut self, num_records: usize) -> Result<usize> {
218        skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
219    }
220
221    fn get_def_levels(&self) -> Option<&[i16]> {
222        self.def_levels_buffer.as_deref()
223    }
224
225    fn get_rep_levels(&self) -> Option<&[i16]> {
226        self.rep_levels_buffer.as_deref()
227    }
228}
229
230#[derive(Default)]
231struct FixedLenByteArrayBuffer {
232    buffer: Vec<u8>,
233    /// The length of each element in bytes
234    byte_length: Option<usize>,
235}
236
237#[inline]
238fn move_values<F>(
239    buffer: &mut Vec<u8>,
240    byte_length: usize,
241    values_range: Range<usize>,
242    valid_mask: &[u8],
243    mut op: F,
244) where
245    F: FnMut(&mut Vec<u8>, usize, usize, usize),
246{
247    for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) {
248        debug_assert!(level_pos >= value_pos);
249        if level_pos <= value_pos {
250            break;
251        }
252
253        let level_pos_bytes = level_pos * byte_length;
254        let value_pos_bytes = value_pos * byte_length;
255
256        op(buffer, level_pos_bytes, value_pos_bytes, byte_length)
257    }
258}
259
260impl ValuesBuffer for FixedLenByteArrayBuffer {
261    fn pad_nulls(
262        &mut self,
263        read_offset: usize,
264        values_read: usize,
265        levels_read: usize,
266        valid_mask: &[u8],
267    ) {
268        let byte_length = self.byte_length.unwrap_or_default();
269
270        assert_eq!(self.buffer.len(), (read_offset + values_read) * byte_length);
271        self.buffer
272            .resize((read_offset + levels_read) * byte_length, 0);
273
274        let values_range = read_offset..read_offset + values_read;
275        // Move the bytes from value_pos to level_pos. For values of `byte_length` <= 4,
276        // the simple loop is preferred as the compiler can eliminate the loop via unrolling.
277        // For `byte_length > 4`, we instead copy from non-overlapping slices. This allows
278        // the loop to be vectorized, yielding much better performance.
279        const VEC_CUTOFF: usize = 4;
280        if byte_length > VEC_CUTOFF {
281            let op = |buffer: &mut Vec<u8>, level_pos_bytes, value_pos_bytes, byte_length| {
282                let split = buffer.split_at_mut(level_pos_bytes);
283                let dst = &mut split.1[..byte_length];
284                let src = &split.0[value_pos_bytes..value_pos_bytes + byte_length];
285                dst.copy_from_slice(src);
286            };
287            move_values(&mut self.buffer, byte_length, values_range, valid_mask, op);
288        } else {
289            let op = |buffer: &mut Vec<u8>, level_pos_bytes, value_pos_bytes, byte_length| {
290                for i in 0..byte_length {
291                    buffer[level_pos_bytes + i] = buffer[value_pos_bytes + i]
292                }
293            };
294            move_values(&mut self.buffer, byte_length, values_range, valid_mask, op);
295        }
296    }
297}
298
299struct ValueDecoder {
300    byte_length: usize,
301    dict_page: Option<Bytes>,
302    decoder: Option<Decoder>,
303}
304
305impl ColumnValueDecoder for ValueDecoder {
306    type Buffer = FixedLenByteArrayBuffer;
307
308    fn new(col: &ColumnDescPtr) -> Self {
309        Self {
310            byte_length: col.type_length() as usize,
311            dict_page: None,
312            decoder: None,
313        }
314    }
315
316    fn set_dict(
317        &mut self,
318        buf: Bytes,
319        num_values: u32,
320        encoding: Encoding,
321        _is_sorted: bool,
322    ) -> Result<()> {
323        if !matches!(
324            encoding,
325            Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
326        ) {
327            return Err(nyi_err!(
328                "Invalid/Unsupported encoding type for dictionary: {}",
329                encoding
330            ));
331        }
332        let expected_len = num_values as usize * self.byte_length;
333        if expected_len > buf.len() {
334            return Err(general_err!(
335                "too few bytes in dictionary page, expected {} got {}",
336                expected_len,
337                buf.len()
338            ));
339        }
340
341        self.dict_page = Some(buf);
342        Ok(())
343    }
344
345    fn set_data(
346        &mut self,
347        encoding: Encoding,
348        data: Bytes,
349        num_levels: usize,
350        num_values: Option<usize>,
351    ) -> Result<()> {
352        self.decoder = Some(match encoding {
353            Encoding::PLAIN => Decoder::Plain {
354                buf: data,
355                offset: 0,
356            },
357            Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => Decoder::Dict {
358                decoder: DictIndexDecoder::new(data, num_levels, num_values),
359            },
360            Encoding::DELTA_BYTE_ARRAY => Decoder::Delta {
361                decoder: DeltaByteArrayDecoder::new(data)?,
362            },
363            Encoding::BYTE_STREAM_SPLIT => Decoder::ByteStreamSplit {
364                buf: data,
365                offset: 0,
366            },
367            _ => {
368                return Err(general_err!(
369                    "unsupported encoding for fixed length byte array: {}",
370                    encoding
371                ))
372            }
373        });
374        Ok(())
375    }
376
377    fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
378        match out.byte_length {
379            Some(x) => assert_eq!(x, self.byte_length),
380            None => out.byte_length = Some(self.byte_length),
381        }
382
383        match self.decoder.as_mut().unwrap() {
384            Decoder::Plain { offset, buf } => {
385                let to_read =
386                    (num_values * self.byte_length).min(buf.len() - *offset) / self.byte_length;
387                let end_offset = *offset + to_read * self.byte_length;
388                out.buffer
389                    .extend_from_slice(&buf.as_ref()[*offset..end_offset]);
390                *offset = end_offset;
391                Ok(to_read)
392            }
393            Decoder::Dict { decoder } => {
394                let dict = self.dict_page.as_ref().unwrap();
395                // All data must be NULL
396                if dict.is_empty() {
397                    return Ok(0);
398                }
399
400                decoder.read(num_values, |keys| {
401                    out.buffer.reserve(keys.len() * self.byte_length);
402                    for key in keys {
403                        let offset = *key as usize * self.byte_length;
404                        let val = &dict.as_ref()[offset..offset + self.byte_length];
405                        out.buffer.extend_from_slice(val);
406                    }
407                    Ok(())
408                })
409            }
410            Decoder::Delta { decoder } => {
411                let to_read = num_values.min(decoder.remaining());
412                out.buffer.reserve(to_read * self.byte_length);
413
414                decoder.read(to_read, |slice| {
415                    if slice.len() != self.byte_length {
416                        return Err(general_err!(
417                            "encountered array with incorrect length, got {} expected {}",
418                            slice.len(),
419                            self.byte_length
420                        ));
421                    }
422                    out.buffer.extend_from_slice(slice);
423                    Ok(())
424                })
425            }
426            Decoder::ByteStreamSplit { buf, offset } => {
427                // we have n=`byte_length` streams of length `buf.len/byte_length`
428                // to read value i, we need the i'th byte from each of the streams
429                // so `offset` should be the value offset, not the byte offset
430                let total_values = buf.len() / self.byte_length;
431                let to_read = num_values.min(total_values - *offset);
432
433                // now read the n streams and reassemble values into the output buffer
434                read_byte_stream_split(&mut out.buffer, buf, *offset, to_read, self.byte_length);
435
436                *offset += to_read;
437                Ok(to_read)
438            }
439        }
440    }
441
442    fn skip_values(&mut self, num_values: usize) -> Result<usize> {
443        match self.decoder.as_mut().unwrap() {
444            Decoder::Plain { offset, buf } => {
445                let to_read = num_values.min((buf.len() - *offset) / self.byte_length);
446                *offset += to_read * self.byte_length;
447                Ok(to_read)
448            }
449            Decoder::Dict { decoder } => decoder.skip(num_values),
450            Decoder::Delta { decoder } => decoder.skip(num_values),
451            Decoder::ByteStreamSplit { offset, buf } => {
452                let total_values = buf.len() / self.byte_length;
453                let to_read = num_values.min(total_values - *offset);
454                *offset += to_read;
455                Ok(to_read)
456            }
457        }
458    }
459}
460
461// `src` is an array laid out like a NxM matrix where N == `data_width` and
462// M == total_values_in_src. Each "row" of the matrix is a stream of bytes, with stream `i`
463// containing the `ith` byte for each value. Each "column" is a single value.
464// This will reassemble `num_values` values by reading columns of the matrix starting at
465// `offset`. Values will be appended to `dst`.
466fn read_byte_stream_split(
467    dst: &mut Vec<u8>,
468    src: &mut Bytes,
469    offset: usize,
470    num_values: usize,
471    data_width: usize,
472) {
473    let stride = src.len() / data_width;
474    let idx = dst.len();
475    dst.resize(idx + num_values * data_width, 0u8);
476    let dst_slc = &mut dst[idx..idx + num_values * data_width];
477    for j in 0..data_width {
478        let src_slc = &src[offset + j * stride..offset + j * stride + num_values];
479        for i in 0..num_values {
480            dst_slc[i * data_width + j] = src_slc[i];
481        }
482    }
483}
484
485enum Decoder {
486    Plain { buf: Bytes, offset: usize },
487    Dict { decoder: DictIndexDecoder },
488    Delta { decoder: DeltaByteArrayDecoder },
489    ByteStreamSplit { buf: Bytes, offset: usize },
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495    use crate::arrow::arrow_reader::ParquetRecordBatchReader;
496    use crate::arrow::ArrowWriter;
497    use arrow::datatypes::Field;
498    use arrow::error::Result as ArrowResult;
499    use arrow_array::{Array, ListArray};
500    use arrow_array::{Decimal256Array, RecordBatch};
501    use bytes::Bytes;
502    use std::sync::Arc;
503
504    #[test]
505    fn test_decimal_list() {
506        let decimals = Decimal256Array::from_iter_values(
507            [1, 2, 3, 4, 5, 6, 7, 8].into_iter().map(i256::from_i128),
508        );
509
510        // [[], [1], [2, 3], null, [4], null, [6, 7, 8]]
511        let data = ArrayDataBuilder::new(ArrowType::List(Arc::new(Field::new_list_field(
512            decimals.data_type().clone(),
513            false,
514        ))))
515        .len(7)
516        .add_buffer(Buffer::from_iter([0_i32, 0, 1, 3, 3, 4, 5, 8]))
517        .null_bit_buffer(Some(Buffer::from(&[0b01010111])))
518        .child_data(vec![decimals.into_data()])
519        .build()
520        .unwrap();
521
522        let written =
523            RecordBatch::try_from_iter([("list", Arc::new(ListArray::from(data)) as ArrayRef)])
524                .unwrap();
525
526        let mut buffer = Vec::with_capacity(1024);
527        let mut writer = ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap();
528        writer.write(&written).unwrap();
529        writer.close().unwrap();
530
531        let read = ParquetRecordBatchReader::try_new(Bytes::from(buffer), 3)
532            .unwrap()
533            .collect::<ArrowResult<Vec<_>>>()
534            .unwrap();
535
536        assert_eq!(&written.slice(0, 3), &read[0]);
537        assert_eq!(&written.slice(3, 3), &read[1]);
538        assert_eq!(&written.slice(6, 1), &read[2]);
539    }
540}