parquet/
data_type.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Data types that connect Parquet physical types with their Rust-specific
19//! representations.
20use bytes::Bytes;
21use half::f16;
22use std::cmp::Ordering;
23use std::fmt;
24use std::mem;
25use std::ops::{Deref, DerefMut};
26use std::str::from_utf8;
27
28use crate::basic::Type;
29use crate::column::reader::{ColumnReader, ColumnReaderImpl};
30use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
31use crate::errors::{ParquetError, Result};
32use crate::util::bit_util::FromBytes;
33
34/// Rust representation for logical type INT96, value is backed by an array of `u32`.
35/// The type only takes 12 bytes, without extra padding.
36#[derive(Clone, Copy, Debug, PartialOrd, Default, PartialEq, Eq)]
37pub struct Int96 {
38    value: [u32; 3],
39}
40
41const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
42
43/// Number of seconds in a day
44const SECONDS_IN_DAY: i64 = 86_400;
45/// Number of milliseconds in a second
46const MILLISECONDS: i64 = 1_000;
47/// Number of microseconds in a second
48const MICROSECONDS: i64 = 1_000_000;
49/// Number of nanoseconds in a second
50const NANOSECONDS: i64 = 1_000_000_000;
51
52/// Number of milliseconds in a day
53const MILLISECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MILLISECONDS;
54/// Number of microseconds in a day
55const MICROSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MICROSECONDS;
56/// Number of nanoseconds in a day
57const NANOSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * NANOSECONDS;
58
59impl Int96 {
60    /// Creates new INT96 type struct with no data set.
61    pub fn new() -> Self {
62        Self { value: [0; 3] }
63    }
64
65    /// Returns underlying data as slice of [`u32`].
66    #[inline]
67    pub fn data(&self) -> &[u32] {
68        &self.value
69    }
70
71    /// Sets data for this INT96 type.
72    #[inline]
73    pub fn set_data(&mut self, elem0: u32, elem1: u32, elem2: u32) {
74        self.value = [elem0, elem1, elem2];
75    }
76
77    /// Converts this INT96 into an i64 representing the number of SECONDS since EPOCH
78    ///
79    /// Will wrap around on overflow
80    #[inline]
81    pub fn to_seconds(&self) -> i64 {
82        let (day, nanos) = self.data_as_days_and_nanos();
83        (day as i64 - JULIAN_DAY_OF_EPOCH)
84            .wrapping_mul(SECONDS_IN_DAY)
85            .wrapping_add(nanos / 1_000_000_000)
86    }
87
88    /// Converts this INT96 into an i64 representing the number of MILLISECONDS since EPOCH
89    ///
90    /// Will wrap around on overflow
91    #[inline]
92    pub fn to_millis(&self) -> i64 {
93        let (day, nanos) = self.data_as_days_and_nanos();
94        (day as i64 - JULIAN_DAY_OF_EPOCH)
95            .wrapping_mul(MILLISECONDS_IN_DAY)
96            .wrapping_add(nanos / 1_000_000)
97    }
98
99    /// Converts this INT96 into an i64 representing the number of MICROSECONDS since EPOCH
100    ///
101    /// Will wrap around on overflow
102    #[inline]
103    pub fn to_micros(&self) -> i64 {
104        let (day, nanos) = self.data_as_days_and_nanos();
105        (day as i64 - JULIAN_DAY_OF_EPOCH)
106            .wrapping_mul(MICROSECONDS_IN_DAY)
107            .wrapping_add(nanos / 1_000)
108    }
109
110    /// Converts this INT96 into an i64 representing the number of NANOSECONDS since EPOCH
111    ///
112    /// Will wrap around on overflow
113    #[inline]
114    pub fn to_nanos(&self) -> i64 {
115        let (day, nanos) = self.data_as_days_and_nanos();
116        (day as i64 - JULIAN_DAY_OF_EPOCH)
117            .wrapping_mul(NANOSECONDS_IN_DAY)
118            .wrapping_add(nanos)
119    }
120
121    #[inline]
122    fn data_as_days_and_nanos(&self) -> (i32, i64) {
123        let day = self.data()[2] as i32;
124        let nanos = ((self.data()[1] as i64) << 32) + self.data()[0] as i64;
125        (day, nanos)
126    }
127}
128
129impl From<Vec<u32>> for Int96 {
130    fn from(buf: Vec<u32>) -> Self {
131        assert_eq!(buf.len(), 3);
132        let mut result = Self::new();
133        result.set_data(buf[0], buf[1], buf[2]);
134        result
135    }
136}
137
138impl fmt::Display for Int96 {
139    #[cold]
140    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
141        write!(f, "{:?}", self.data())
142    }
143}
144
145/// Rust representation for BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY Parquet physical types.
146/// Value is backed by a byte buffer.
147#[derive(Clone, Default)]
148pub struct ByteArray {
149    data: Option<Bytes>,
150}
151
152// Special case Debug that prints out byte arrays that are valid utf8 as &str's
153impl std::fmt::Debug for ByteArray {
154    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155        let mut debug_struct = f.debug_struct("ByteArray");
156        match self.as_utf8() {
157            Ok(s) => debug_struct.field("data", &s),
158            Err(_) => debug_struct.field("data", &self.data),
159        };
160        debug_struct.finish()
161    }
162}
163
164impl PartialOrd for ByteArray {
165    fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
166        // sort nulls first (consistent with PartialCmp on Option)
167        //
168        // Since ByteBuffer doesn't implement PartialOrd, so can't
169        // derive an implementation
170        match (&self.data, &other.data) {
171            (None, None) => Some(Ordering::Equal),
172            (None, Some(_)) => Some(Ordering::Less),
173            (Some(_), None) => Some(Ordering::Greater),
174            (Some(self_data), Some(other_data)) => {
175                // compare slices directly
176                self_data.partial_cmp(&other_data)
177            }
178        }
179    }
180}
181
182impl ByteArray {
183    /// Creates new byte array with no data set.
184    #[inline]
185    pub fn new() -> Self {
186        ByteArray { data: None }
187    }
188
189    /// Gets length of the underlying byte buffer.
190    #[inline]
191    pub fn len(&self) -> usize {
192        assert!(self.data.is_some());
193        self.data.as_ref().unwrap().len()
194    }
195
196    /// Checks if the underlying buffer is empty.
197    #[inline]
198    pub fn is_empty(&self) -> bool {
199        self.len() == 0
200    }
201
202    /// Returns slice of data.
203    #[inline]
204    pub fn data(&self) -> &[u8] {
205        self.data
206            .as_ref()
207            .expect("set_data should have been called")
208            .as_ref()
209    }
210
211    /// Set data from another byte buffer.
212    #[inline]
213    pub fn set_data(&mut self, data: Bytes) {
214        self.data = Some(data);
215    }
216
217    /// Returns `ByteArray` instance with slice of values for a data.
218    #[inline]
219    pub fn slice(&self, start: usize, len: usize) -> Self {
220        Self::from(
221            self.data
222                .as_ref()
223                .expect("set_data should have been called")
224                .slice(start..start + len),
225        )
226    }
227
228    /// Try to convert the byte array to a utf8 slice
229    pub fn as_utf8(&self) -> Result<&str> {
230        self.data
231            .as_ref()
232            .map(|ptr| ptr.as_ref())
233            .ok_or_else(|| general_err!("Can't convert empty byte array to utf8"))
234            .and_then(|bytes| from_utf8(bytes).map_err(|e| e.into()))
235    }
236}
237
238impl From<Vec<u8>> for ByteArray {
239    fn from(buf: Vec<u8>) -> ByteArray {
240        Self {
241            data: Some(buf.into()),
242        }
243    }
244}
245
246impl<'a> From<&'a [u8]> for ByteArray {
247    fn from(b: &'a [u8]) -> ByteArray {
248        let mut v = Vec::new();
249        v.extend_from_slice(b);
250        Self {
251            data: Some(v.into()),
252        }
253    }
254}
255
256impl<'a> From<&'a str> for ByteArray {
257    fn from(s: &'a str) -> ByteArray {
258        let mut v = Vec::new();
259        v.extend_from_slice(s.as_bytes());
260        Self {
261            data: Some(v.into()),
262        }
263    }
264}
265
266impl From<Bytes> for ByteArray {
267    fn from(value: Bytes) -> Self {
268        Self { data: Some(value) }
269    }
270}
271
272impl From<f16> for ByteArray {
273    fn from(value: f16) -> Self {
274        Self::from(value.to_le_bytes().as_slice())
275    }
276}
277
278impl PartialEq for ByteArray {
279    fn eq(&self, other: &ByteArray) -> bool {
280        match (&self.data, &other.data) {
281            (Some(d1), Some(d2)) => d1.as_ref() == d2.as_ref(),
282            (None, None) => true,
283            _ => false,
284        }
285    }
286}
287
288impl fmt::Display for ByteArray {
289    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
290        write!(f, "{:?}", self.data())
291    }
292}
293
294/// Wrapper type for performance reasons, this represents `FIXED_LEN_BYTE_ARRAY` but in all other
295/// considerations behaves the same as `ByteArray`
296///
297/// # Performance notes:
298/// This type is a little unfortunate, without it the compiler generates code that takes quite a
299/// big hit on the CPU pipeline. Essentially the previous version stalls awaiting the result of
300/// `T::get_physical_type() == Type::FIXED_LEN_BYTE_ARRAY`.
301///
302/// Its debatable if this is wanted, it is out of spec for what parquet documents as its base
303/// types, although there are code paths in the Rust (and potentially the C++) versions that
304/// warrant this.
305///
306/// With this wrapper type the compiler generates more targeted code paths matching the higher
307/// level logical types, removing the data-hazard from all decoding and encoding paths.
308#[repr(transparent)]
309#[derive(Clone, Debug, Default)]
310pub struct FixedLenByteArray(ByteArray);
311
312impl PartialEq for FixedLenByteArray {
313    fn eq(&self, other: &FixedLenByteArray) -> bool {
314        self.0.eq(&other.0)
315    }
316}
317
318impl PartialEq<ByteArray> for FixedLenByteArray {
319    fn eq(&self, other: &ByteArray) -> bool {
320        self.0.eq(other)
321    }
322}
323
324impl PartialEq<FixedLenByteArray> for ByteArray {
325    fn eq(&self, other: &FixedLenByteArray) -> bool {
326        self.eq(&other.0)
327    }
328}
329
330impl fmt::Display for FixedLenByteArray {
331    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
332        self.0.fmt(f)
333    }
334}
335
336impl PartialOrd for FixedLenByteArray {
337    fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> {
338        self.0.partial_cmp(&other.0)
339    }
340}
341
342impl PartialOrd<FixedLenByteArray> for ByteArray {
343    fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> {
344        self.partial_cmp(&other.0)
345    }
346}
347
348impl PartialOrd<ByteArray> for FixedLenByteArray {
349    fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
350        self.0.partial_cmp(other)
351    }
352}
353
354impl Deref for FixedLenByteArray {
355    type Target = ByteArray;
356
357    fn deref(&self) -> &Self::Target {
358        &self.0
359    }
360}
361
362impl DerefMut for FixedLenByteArray {
363    fn deref_mut(&mut self) -> &mut Self::Target {
364        &mut self.0
365    }
366}
367
368impl From<ByteArray> for FixedLenByteArray {
369    fn from(other: ByteArray) -> Self {
370        Self(other)
371    }
372}
373
374impl From<Vec<u8>> for FixedLenByteArray {
375    fn from(buf: Vec<u8>) -> FixedLenByteArray {
376        FixedLenByteArray(ByteArray::from(buf))
377    }
378}
379
380impl From<FixedLenByteArray> for ByteArray {
381    fn from(other: FixedLenByteArray) -> Self {
382        other.0
383    }
384}
385
386/// Rust representation for Decimal values.
387///
388/// This is not a representation of Parquet physical type, but rather a wrapper for
389/// DECIMAL logical type, and serves as container for raw parts of decimal values:
390/// unscaled value in bytes, precision and scale.
391#[derive(Clone, Debug)]
392pub enum Decimal {
393    /// Decimal backed by `i32`.
394    Int32 {
395        /// The underlying value
396        value: [u8; 4],
397        /// The total number of digits in the number
398        precision: i32,
399        /// The number of digits to the right of the decimal point
400        scale: i32,
401    },
402    /// Decimal backed by `i64`.
403    Int64 {
404        /// The underlying value
405        value: [u8; 8],
406        /// The total number of digits in the number
407        precision: i32,
408        /// The number of digits to the right of the decimal point
409        scale: i32,
410    },
411    /// Decimal backed by byte array.
412    Bytes {
413        /// The underlying value
414        value: ByteArray,
415        /// The total number of digits in the number
416        precision: i32,
417        /// The number of digits to the right of the decimal point
418        scale: i32,
419    },
420}
421
422impl Decimal {
423    /// Creates new decimal value from `i32`.
424    pub fn from_i32(value: i32, precision: i32, scale: i32) -> Self {
425        let bytes = value.to_be_bytes();
426        Decimal::Int32 {
427            value: bytes,
428            precision,
429            scale,
430        }
431    }
432
433    /// Creates new decimal value from `i64`.
434    pub fn from_i64(value: i64, precision: i32, scale: i32) -> Self {
435        let bytes = value.to_be_bytes();
436        Decimal::Int64 {
437            value: bytes,
438            precision,
439            scale,
440        }
441    }
442
443    /// Creates new decimal value from `ByteArray`.
444    pub fn from_bytes(value: ByteArray, precision: i32, scale: i32) -> Self {
445        Decimal::Bytes {
446            value,
447            precision,
448            scale,
449        }
450    }
451
452    /// Returns bytes of unscaled value.
453    pub fn data(&self) -> &[u8] {
454        match *self {
455            Decimal::Int32 { ref value, .. } => value,
456            Decimal::Int64 { ref value, .. } => value,
457            Decimal::Bytes { ref value, .. } => value.data(),
458        }
459    }
460
461    /// Returns decimal precision.
462    pub fn precision(&self) -> i32 {
463        match *self {
464            Decimal::Int32 { precision, .. } => precision,
465            Decimal::Int64 { precision, .. } => precision,
466            Decimal::Bytes { precision, .. } => precision,
467        }
468    }
469
470    /// Returns decimal scale.
471    pub fn scale(&self) -> i32 {
472        match *self {
473            Decimal::Int32 { scale, .. } => scale,
474            Decimal::Int64 { scale, .. } => scale,
475            Decimal::Bytes { scale, .. } => scale,
476        }
477    }
478}
479
480impl Default for Decimal {
481    fn default() -> Self {
482        Self::from_i32(0, 0, 0)
483    }
484}
485
486impl PartialEq for Decimal {
487    fn eq(&self, other: &Decimal) -> bool {
488        self.precision() == other.precision()
489            && self.scale() == other.scale()
490            && self.data() == other.data()
491    }
492}
493
494/// Converts an instance of data type to a slice of bytes as `u8`.
495pub trait AsBytes {
496    /// Returns slice of bytes for this data type.
497    fn as_bytes(&self) -> &[u8];
498}
499
500/// Converts an slice of a data type to a slice of bytes.
501pub trait SliceAsBytes: Sized {
502    /// Returns slice of bytes for a slice of this data type.
503    fn slice_as_bytes(self_: &[Self]) -> &[u8];
504    /// Return the internal representation as a mutable slice
505    ///
506    /// # Safety
507    /// If modified you are _required_ to ensure the internal representation
508    /// is valid and correct for the actual raw data
509    unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8];
510}
511
512impl AsBytes for [u8] {
513    fn as_bytes(&self) -> &[u8] {
514        self
515    }
516}
517
518macro_rules! gen_as_bytes {
519    ($source_ty:ident) => {
520        impl AsBytes for $source_ty {
521            #[allow(clippy::size_of_in_element_count)]
522            fn as_bytes(&self) -> &[u8] {
523                // SAFETY: macro is only used with primitive types that have no padding, so the
524                // resulting slice always refers to initialized memory.
525                unsafe {
526                    std::slice::from_raw_parts(
527                        self as *const $source_ty as *const u8,
528                        std::mem::size_of::<$source_ty>(),
529                    )
530                }
531            }
532        }
533
534        impl SliceAsBytes for $source_ty {
535            #[inline]
536            #[allow(clippy::size_of_in_element_count)]
537            fn slice_as_bytes(self_: &[Self]) -> &[u8] {
538                // SAFETY: macro is only used with primitive types that have no padding, so the
539                // resulting slice always refers to initialized memory.
540                unsafe {
541                    std::slice::from_raw_parts(
542                        self_.as_ptr() as *const u8,
543                        std::mem::size_of_val(self_),
544                    )
545                }
546            }
547
548            #[inline]
549            #[allow(clippy::size_of_in_element_count)]
550            unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8] {
551                // SAFETY: macro is only used with primitive types that have no padding, so the
552                // resulting slice always refers to initialized memory. Moreover, self has no
553                // invalid bit patterns, so all writes to the resulting slice will be valid.
554                unsafe {
555                    std::slice::from_raw_parts_mut(
556                        self_.as_mut_ptr() as *mut u8,
557                        std::mem::size_of_val(self_),
558                    )
559                }
560            }
561        }
562    };
563}
564
565gen_as_bytes!(i8);
566gen_as_bytes!(i16);
567gen_as_bytes!(i32);
568gen_as_bytes!(i64);
569gen_as_bytes!(u8);
570gen_as_bytes!(u16);
571gen_as_bytes!(u32);
572gen_as_bytes!(u64);
573gen_as_bytes!(f32);
574gen_as_bytes!(f64);
575
576macro_rules! unimplemented_slice_as_bytes {
577    ($ty: ty) => {
578        impl SliceAsBytes for $ty {
579            fn slice_as_bytes(_self: &[Self]) -> &[u8] {
580                unimplemented!()
581            }
582
583            unsafe fn slice_as_bytes_mut(_self: &mut [Self]) -> &mut [u8] {
584                unimplemented!()
585            }
586        }
587    };
588}
589
590// TODO - Can Int96 and bool be implemented in these terms?
591unimplemented_slice_as_bytes!(Int96);
592unimplemented_slice_as_bytes!(bool);
593unimplemented_slice_as_bytes!(ByteArray);
594unimplemented_slice_as_bytes!(FixedLenByteArray);
595
596impl AsBytes for bool {
597    fn as_bytes(&self) -> &[u8] {
598        // SAFETY: a bool is guaranteed to be either 0x00 or 0x01 in memory, so the memory is
599        // valid.
600        unsafe { std::slice::from_raw_parts(self as *const bool as *const u8, 1) }
601    }
602}
603
604impl AsBytes for Int96 {
605    fn as_bytes(&self) -> &[u8] {
606        // SAFETY: Int96::data is a &[u32; 3].
607        unsafe { std::slice::from_raw_parts(self.data() as *const [u32] as *const u8, 12) }
608    }
609}
610
611impl AsBytes for ByteArray {
612    fn as_bytes(&self) -> &[u8] {
613        self.data()
614    }
615}
616
617impl AsBytes for FixedLenByteArray {
618    fn as_bytes(&self) -> &[u8] {
619        self.data()
620    }
621}
622
623impl AsBytes for Decimal {
624    fn as_bytes(&self) -> &[u8] {
625        self.data()
626    }
627}
628
629impl AsBytes for Vec<u8> {
630    fn as_bytes(&self) -> &[u8] {
631        self.as_slice()
632    }
633}
634
635impl AsBytes for &str {
636    fn as_bytes(&self) -> &[u8] {
637        (self as &str).as_bytes()
638    }
639}
640
641impl AsBytes for str {
642    fn as_bytes(&self) -> &[u8] {
643        (self as &str).as_bytes()
644    }
645}
646
647pub(crate) mod private {
648    use bytes::Bytes;
649
650    use crate::encodings::decoding::PlainDecoderDetails;
651    use crate::util::bit_util::{read_num_bytes, BitReader, BitWriter};
652
653    use super::{ParquetError, Result, SliceAsBytes};
654    use crate::basic::Type;
655    use crate::file::metadata::HeapSize;
656
657    /// Sealed trait to start to remove specialisation from implementations
658    ///
659    /// This is done to force the associated value type to be unimplementable outside of this
660    /// crate, and thus hint to the type system (and end user) traits are public for the contract
661    /// and not for extension.
662    pub trait ParquetValueType:
663        PartialEq
664        + std::fmt::Debug
665        + std::fmt::Display
666        + Default
667        + Clone
668        + super::AsBytes
669        + super::FromBytes
670        + SliceAsBytes
671        + PartialOrd
672        + Send
673        + HeapSize
674        + crate::encodings::decoding::private::GetDecoder
675        + crate::file::statistics::private::MakeStatistics
676    {
677        const PHYSICAL_TYPE: Type;
678
679        /// Encode the value directly from a higher level encoder
680        fn encode<W: std::io::Write>(
681            values: &[Self],
682            writer: &mut W,
683            bit_writer: &mut BitWriter,
684        ) -> Result<()>;
685
686        /// Establish the data that will be decoded in a buffer
687        fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize);
688
689        /// Decode the value from a given buffer for a higher level decoder
690        fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize>;
691
692        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize>;
693
694        /// Return the encoded size for a type
695        fn dict_encoding_size(&self) -> (usize, usize) {
696            (std::mem::size_of::<Self>(), 1)
697        }
698
699        /// Return the number of variable length bytes in a given slice of data
700        ///
701        /// Returns the sum of lengths for BYTE_ARRAY data, and None for all other data types
702        fn variable_length_bytes(_: &[Self]) -> Option<i64> {
703            None
704        }
705
706        /// Return the value as i64 if possible
707        ///
708        /// This is essentially the same as `std::convert::TryInto<i64>` but can't be
709        /// implemented for `f32` and `f64`, types that would fail orphan rules
710        fn as_i64(&self) -> Result<i64> {
711            Err(general_err!("Type cannot be converted to i64"))
712        }
713
714        /// Return the value as u64 if possible
715        ///
716        /// This is essentially the same as `std::convert::TryInto<u64>` but can't be
717        /// implemented for `f32` and `f64`, types that would fail orphan rules
718        fn as_u64(&self) -> Result<u64> {
719            self.as_i64()
720                .map_err(|_| general_err!("Type cannot be converted to u64"))
721                .map(|x| x as u64)
722        }
723
724        /// Return the value as an Any to allow for downcasts without transmutation
725        fn as_any(&self) -> &dyn std::any::Any;
726
727        /// Return the value as an mutable Any to allow for downcasts without transmutation
728        fn as_mut_any(&mut self) -> &mut dyn std::any::Any;
729
730        /// Sets the value of this object from the provided [`Bytes`]
731        ///
732        /// Only implemented for `ByteArray` and `FixedLenByteArray`. Will panic for other types.
733        fn set_from_bytes(&mut self, _data: Bytes) {
734            unimplemented!();
735        }
736    }
737
738    impl ParquetValueType for bool {
739        const PHYSICAL_TYPE: Type = Type::BOOLEAN;
740
741        #[inline]
742        fn encode<W: std::io::Write>(
743            values: &[Self],
744            _: &mut W,
745            bit_writer: &mut BitWriter,
746        ) -> Result<()> {
747            for value in values {
748                bit_writer.put_value(*value as u64, 1)
749            }
750            Ok(())
751        }
752
753        #[inline]
754        fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
755            decoder.bit_reader.replace(BitReader::new(data));
756            decoder.num_values = num_values;
757        }
758
759        #[inline]
760        fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
761            let bit_reader = decoder.bit_reader.as_mut().unwrap();
762            let num_values = std::cmp::min(buffer.len(), decoder.num_values);
763            let values_read = bit_reader.get_batch(&mut buffer[..num_values], 1);
764            decoder.num_values -= values_read;
765            Ok(values_read)
766        }
767
768        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
769            let bit_reader = decoder.bit_reader.as_mut().unwrap();
770            let num_values = std::cmp::min(num_values, decoder.num_values);
771            let values_read = bit_reader.skip(num_values, 1);
772            decoder.num_values -= values_read;
773            Ok(values_read)
774        }
775
776        #[inline]
777        fn as_i64(&self) -> Result<i64> {
778            Ok(*self as i64)
779        }
780
781        #[inline]
782        fn as_any(&self) -> &dyn std::any::Any {
783            self
784        }
785
786        #[inline]
787        fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
788            self
789        }
790    }
791
792    macro_rules! impl_from_raw {
793        ($ty: ty, $physical_ty: expr, $self: ident => $as_i64: block) => {
794            impl ParquetValueType for $ty {
795                const PHYSICAL_TYPE: Type = $physical_ty;
796
797                #[inline]
798                fn encode<W: std::io::Write>(values: &[Self], writer: &mut W, _: &mut BitWriter) -> Result<()> {
799                    // SAFETY: Self is one of i32, i64, f32, f64, which have no padding.
800                    let raw = unsafe {
801                        std::slice::from_raw_parts(
802                            values.as_ptr() as *const u8,
803                            std::mem::size_of_val(values),
804                        )
805                    };
806                    writer.write_all(raw)?;
807
808                    Ok(())
809                }
810
811                #[inline]
812                fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
813                    decoder.data.replace(data);
814                    decoder.start = 0;
815                    decoder.num_values = num_values;
816                }
817
818                #[inline]
819                fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
820                    let data = decoder.data.as_ref().expect("set_data should have been called");
821                    let num_values = std::cmp::min(buffer.len(), decoder.num_values);
822                    let bytes_left = data.len() - decoder.start;
823                    let bytes_to_decode = std::mem::size_of::<Self>() * num_values;
824
825                    if bytes_left < bytes_to_decode {
826                        return Err(eof_err!("Not enough bytes to decode"));
827                    }
828
829                    {
830                        // SAFETY: Self has no invalid bit patterns, so writing to the slice
831                        // obtained with slice_as_bytes_mut is always safe.
832                        let raw_buffer = &mut unsafe { Self::slice_as_bytes_mut(buffer) }[..bytes_to_decode];
833                        raw_buffer.copy_from_slice(data.slice(
834                            decoder.start..decoder.start + bytes_to_decode
835                        ).as_ref());
836                    };
837                    decoder.start += bytes_to_decode;
838                    decoder.num_values -= num_values;
839
840                    Ok(num_values)
841                }
842
843                #[inline]
844                fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
845                    let data = decoder.data.as_ref().expect("set_data should have been called");
846                    let num_values = num_values.min(decoder.num_values);
847                    let bytes_left = data.len() - decoder.start;
848                    let bytes_to_skip = std::mem::size_of::<Self>() * num_values;
849
850                    if bytes_left < bytes_to_skip {
851                        return Err(eof_err!("Not enough bytes to skip"));
852                    }
853
854                    decoder.start += bytes_to_skip;
855                    decoder.num_values -= num_values;
856
857                    Ok(num_values)
858                }
859
860                #[inline]
861                fn as_i64(&$self) -> Result<i64> {
862                    $as_i64
863                }
864
865                #[inline]
866                fn as_any(&self) -> &dyn std::any::Any {
867                    self
868                }
869
870                #[inline]
871                fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
872                    self
873                }
874            }
875        }
876    }
877
878    impl_from_raw!(i32, Type::INT32, self => { Ok(*self as i64) });
879    impl_from_raw!(i64, Type::INT64, self => { Ok(*self) });
880    impl_from_raw!(f32, Type::FLOAT, self => { Err(general_err!("Type cannot be converted to i64")) });
881    impl_from_raw!(f64, Type::DOUBLE, self => { Err(general_err!("Type cannot be converted to i64")) });
882
883    impl ParquetValueType for super::Int96 {
884        const PHYSICAL_TYPE: Type = Type::INT96;
885
886        #[inline]
887        fn encode<W: std::io::Write>(
888            values: &[Self],
889            writer: &mut W,
890            _: &mut BitWriter,
891        ) -> Result<()> {
892            for value in values {
893                let raw = SliceAsBytes::slice_as_bytes(value.data());
894                writer.write_all(raw)?;
895            }
896            Ok(())
897        }
898
899        #[inline]
900        fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
901            decoder.data.replace(data);
902            decoder.start = 0;
903            decoder.num_values = num_values;
904        }
905
906        #[inline]
907        fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
908            // TODO - Remove the duplication between this and the general slice method
909            let data = decoder
910                .data
911                .as_ref()
912                .expect("set_data should have been called");
913            let num_values = std::cmp::min(buffer.len(), decoder.num_values);
914            let bytes_left = data.len() - decoder.start;
915            let bytes_to_decode = 12 * num_values;
916
917            if bytes_left < bytes_to_decode {
918                return Err(eof_err!("Not enough bytes to decode"));
919            }
920
921            let data_range = data.slice(decoder.start..decoder.start + bytes_to_decode);
922            let bytes: &[u8] = &data_range;
923            decoder.start += bytes_to_decode;
924
925            let mut pos = 0; // position in byte array
926            for item in buffer.iter_mut().take(num_values) {
927                let elem0 = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap());
928                let elem1 = u32::from_le_bytes(bytes[pos + 4..pos + 8].try_into().unwrap());
929                let elem2 = u32::from_le_bytes(bytes[pos + 8..pos + 12].try_into().unwrap());
930
931                item.set_data(elem0, elem1, elem2);
932                pos += 12;
933            }
934            decoder.num_values -= num_values;
935
936            Ok(num_values)
937        }
938
939        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
940            let data = decoder
941                .data
942                .as_ref()
943                .expect("set_data should have been called");
944            let num_values = std::cmp::min(num_values, decoder.num_values);
945            let bytes_left = data.len() - decoder.start;
946            let bytes_to_skip = 12 * num_values;
947
948            if bytes_left < bytes_to_skip {
949                return Err(eof_err!("Not enough bytes to skip"));
950            }
951            decoder.start += bytes_to_skip;
952            decoder.num_values -= num_values;
953
954            Ok(num_values)
955        }
956
957        #[inline]
958        fn as_any(&self) -> &dyn std::any::Any {
959            self
960        }
961
962        #[inline]
963        fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
964            self
965        }
966    }
967
968    impl HeapSize for super::Int96 {
969        fn heap_size(&self) -> usize {
970            0 // no heap allocations
971        }
972    }
973
974    impl ParquetValueType for super::ByteArray {
975        const PHYSICAL_TYPE: Type = Type::BYTE_ARRAY;
976
977        #[inline]
978        fn encode<W: std::io::Write>(
979            values: &[Self],
980            writer: &mut W,
981            _: &mut BitWriter,
982        ) -> Result<()> {
983            for value in values {
984                let len: u32 = value.len().try_into().unwrap();
985                writer.write_all(&len.to_ne_bytes())?;
986                let raw = value.data();
987                writer.write_all(raw)?;
988            }
989            Ok(())
990        }
991
992        #[inline]
993        fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
994            decoder.data.replace(data);
995            decoder.start = 0;
996            decoder.num_values = num_values;
997        }
998
999        #[inline]
1000        fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
1001            let data = decoder
1002                .data
1003                .as_mut()
1004                .expect("set_data should have been called");
1005            let num_values = std::cmp::min(buffer.len(), decoder.num_values);
1006            for val_array in buffer.iter_mut().take(num_values) {
1007                let len: usize =
1008                    read_num_bytes::<u32>(4, data.slice(decoder.start..).as_ref()) as usize;
1009                decoder.start += std::mem::size_of::<u32>();
1010
1011                if data.len() < decoder.start + len {
1012                    return Err(eof_err!("Not enough bytes to decode"));
1013                }
1014
1015                val_array.set_data(data.slice(decoder.start..decoder.start + len));
1016                decoder.start += len;
1017            }
1018            decoder.num_values -= num_values;
1019
1020            Ok(num_values)
1021        }
1022
1023        fn variable_length_bytes(values: &[Self]) -> Option<i64> {
1024            Some(values.iter().map(|x| x.len() as i64).sum())
1025        }
1026
1027        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
1028            let data = decoder
1029                .data
1030                .as_mut()
1031                .expect("set_data should have been called");
1032            let num_values = num_values.min(decoder.num_values);
1033
1034            for _ in 0..num_values {
1035                let len: usize =
1036                    read_num_bytes::<u32>(4, data.slice(decoder.start..).as_ref()) as usize;
1037                decoder.start += std::mem::size_of::<u32>() + len;
1038            }
1039            decoder.num_values -= num_values;
1040
1041            Ok(num_values)
1042        }
1043
1044        #[inline]
1045        fn dict_encoding_size(&self) -> (usize, usize) {
1046            (std::mem::size_of::<u32>(), self.len())
1047        }
1048
1049        #[inline]
1050        fn as_any(&self) -> &dyn std::any::Any {
1051            self
1052        }
1053
1054        #[inline]
1055        fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
1056            self
1057        }
1058
1059        #[inline]
1060        fn set_from_bytes(&mut self, data: Bytes) {
1061            self.set_data(data);
1062        }
1063    }
1064
1065    impl HeapSize for super::ByteArray {
1066        fn heap_size(&self) -> usize {
1067            // note: this is an estimate, not exact, so just return the size
1068            // of the actual data used, don't try to handle the fact that it may
1069            // be shared.
1070            self.data.as_ref().map(|data| data.len()).unwrap_or(0)
1071        }
1072    }
1073
1074    impl ParquetValueType for super::FixedLenByteArray {
1075        const PHYSICAL_TYPE: Type = Type::FIXED_LEN_BYTE_ARRAY;
1076
1077        #[inline]
1078        fn encode<W: std::io::Write>(
1079            values: &[Self],
1080            writer: &mut W,
1081            _: &mut BitWriter,
1082        ) -> Result<()> {
1083            for value in values {
1084                let raw = value.data();
1085                writer.write_all(raw)?;
1086            }
1087            Ok(())
1088        }
1089
1090        #[inline]
1091        fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
1092            decoder.data.replace(data);
1093            decoder.start = 0;
1094            decoder.num_values = num_values;
1095        }
1096
1097        #[inline]
1098        fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
1099            assert!(decoder.type_length > 0);
1100
1101            let data = decoder
1102                .data
1103                .as_mut()
1104                .expect("set_data should have been called");
1105            let num_values = std::cmp::min(buffer.len(), decoder.num_values);
1106
1107            for item in buffer.iter_mut().take(num_values) {
1108                let len = decoder.type_length as usize;
1109
1110                if data.len() < decoder.start + len {
1111                    return Err(eof_err!("Not enough bytes to decode"));
1112                }
1113
1114                item.set_data(data.slice(decoder.start..decoder.start + len));
1115                decoder.start += len;
1116            }
1117            decoder.num_values -= num_values;
1118
1119            Ok(num_values)
1120        }
1121
1122        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
1123            assert!(decoder.type_length > 0);
1124
1125            let data = decoder
1126                .data
1127                .as_mut()
1128                .expect("set_data should have been called");
1129            let num_values = std::cmp::min(num_values, decoder.num_values);
1130            for _ in 0..num_values {
1131                let len = decoder.type_length as usize;
1132
1133                if data.len() < decoder.start + len {
1134                    return Err(eof_err!("Not enough bytes to skip"));
1135                }
1136
1137                decoder.start += len;
1138            }
1139            decoder.num_values -= num_values;
1140
1141            Ok(num_values)
1142        }
1143
1144        #[inline]
1145        fn dict_encoding_size(&self) -> (usize, usize) {
1146            (std::mem::size_of::<u32>(), self.len())
1147        }
1148
1149        #[inline]
1150        fn as_any(&self) -> &dyn std::any::Any {
1151            self
1152        }
1153
1154        #[inline]
1155        fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
1156            self
1157        }
1158
1159        #[inline]
1160        fn set_from_bytes(&mut self, data: Bytes) {
1161            self.set_data(data);
1162        }
1163    }
1164
1165    impl HeapSize for super::FixedLenByteArray {
1166        fn heap_size(&self) -> usize {
1167            self.0.heap_size()
1168        }
1169    }
1170}
1171
1172/// Contains the Parquet physical type information as well as the Rust primitive type
1173/// presentation.
1174pub trait DataType: 'static + Send {
1175    /// The physical type of the Parquet data type.
1176    type T: private::ParquetValueType;
1177
1178    /// Returns Parquet physical type.
1179    fn get_physical_type() -> Type {
1180        <Self::T as private::ParquetValueType>::PHYSICAL_TYPE
1181    }
1182
1183    /// Returns size in bytes for Rust representation of the physical type.
1184    fn get_type_size() -> usize;
1185
1186    /// Returns the underlying [`ColumnReaderImpl`] for the given [`ColumnReader`].
1187    fn get_column_reader(column_writer: ColumnReader) -> Option<ColumnReaderImpl<Self>>
1188    where
1189        Self: Sized;
1190
1191    /// Returns the underlying [`ColumnWriterImpl`] for the given [`ColumnWriter`].
1192    fn get_column_writer(column_writer: ColumnWriter<'_>) -> Option<ColumnWriterImpl<'_, Self>>
1193    where
1194        Self: Sized;
1195
1196    /// Returns a reference to the underlying [`ColumnWriterImpl`] for the given [`ColumnWriter`].
1197    fn get_column_writer_ref<'a, 'b: 'a>(
1198        column_writer: &'b ColumnWriter<'a>,
1199    ) -> Option<&'b ColumnWriterImpl<'a, Self>>
1200    where
1201        Self: Sized;
1202
1203    /// Returns a mutable reference to the underlying [`ColumnWriterImpl`] for the given
1204    fn get_column_writer_mut<'a, 'b: 'a>(
1205        column_writer: &'a mut ColumnWriter<'b>,
1206    ) -> Option<&'a mut ColumnWriterImpl<'b, Self>>
1207    where
1208        Self: Sized;
1209}
1210
1211macro_rules! make_type {
1212    ($name:ident, $reader_ident: ident, $writer_ident: ident, $native_ty:ty, $size:expr) => {
1213        #[doc = concat!("Parquet physical type: ", stringify!($name))]
1214        #[derive(Clone)]
1215        pub struct $name {}
1216
1217        impl DataType for $name {
1218            type T = $native_ty;
1219
1220            fn get_type_size() -> usize {
1221                $size
1222            }
1223
1224            fn get_column_reader(column_reader: ColumnReader) -> Option<ColumnReaderImpl<Self>> {
1225                match column_reader {
1226                    ColumnReader::$reader_ident(w) => Some(w),
1227                    _ => None,
1228                }
1229            }
1230
1231            fn get_column_writer(
1232                column_writer: ColumnWriter<'_>,
1233            ) -> Option<ColumnWriterImpl<'_, Self>> {
1234                match column_writer {
1235                    ColumnWriter::$writer_ident(w) => Some(w),
1236                    _ => None,
1237                }
1238            }
1239
1240            fn get_column_writer_ref<'a, 'b: 'a>(
1241                column_writer: &'a ColumnWriter<'b>,
1242            ) -> Option<&'a ColumnWriterImpl<'b, Self>> {
1243                match column_writer {
1244                    ColumnWriter::$writer_ident(w) => Some(w),
1245                    _ => None,
1246                }
1247            }
1248
1249            fn get_column_writer_mut<'a, 'b: 'a>(
1250                column_writer: &'a mut ColumnWriter<'b>,
1251            ) -> Option<&'a mut ColumnWriterImpl<'b, Self>> {
1252                match column_writer {
1253                    ColumnWriter::$writer_ident(w) => Some(w),
1254                    _ => None,
1255                }
1256            }
1257        }
1258    };
1259}
1260
1261// Generate struct definitions for all physical types
1262
1263make_type!(BoolType, BoolColumnReader, BoolColumnWriter, bool, 1);
1264make_type!(Int32Type, Int32ColumnReader, Int32ColumnWriter, i32, 4);
1265make_type!(Int64Type, Int64ColumnReader, Int64ColumnWriter, i64, 8);
1266make_type!(
1267    Int96Type,
1268    Int96ColumnReader,
1269    Int96ColumnWriter,
1270    Int96,
1271    mem::size_of::<Int96>()
1272);
1273make_type!(FloatType, FloatColumnReader, FloatColumnWriter, f32, 4);
1274make_type!(DoubleType, DoubleColumnReader, DoubleColumnWriter, f64, 8);
1275make_type!(
1276    ByteArrayType,
1277    ByteArrayColumnReader,
1278    ByteArrayColumnWriter,
1279    ByteArray,
1280    mem::size_of::<ByteArray>()
1281);
1282make_type!(
1283    FixedLenByteArrayType,
1284    FixedLenByteArrayColumnReader,
1285    FixedLenByteArrayColumnWriter,
1286    FixedLenByteArray,
1287    mem::size_of::<FixedLenByteArray>()
1288);
1289
1290impl AsRef<[u8]> for ByteArray {
1291    fn as_ref(&self) -> &[u8] {
1292        self.as_bytes()
1293    }
1294}
1295
1296impl AsRef<[u8]> for FixedLenByteArray {
1297    fn as_ref(&self) -> &[u8] {
1298        self.as_bytes()
1299    }
1300}
1301
1302/// Macro to reduce repetition in making type assertions on the physical type against `T`
1303macro_rules! ensure_phys_ty {
1304    ($($ty:pat_param)|+ , $err: literal) => {
1305        match T::get_physical_type() {
1306            $($ty => (),)*
1307            _ => panic!($err),
1308        };
1309    }
1310}
1311
1312#[cfg(test)]
1313mod tests {
1314    use super::*;
1315
1316    #[test]
1317    fn test_as_bytes() {
1318        // Test Int96
1319        let i96 = Int96::from(vec![1, 2, 3]);
1320        assert_eq!(i96.as_bytes(), &[1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0]);
1321
1322        // Test ByteArray
1323        let ba = ByteArray::from(vec![1, 2, 3]);
1324        assert_eq!(ba.as_bytes(), &[1, 2, 3]);
1325
1326        // Test Decimal
1327        let decimal = Decimal::from_i32(123, 5, 2);
1328        assert_eq!(decimal.as_bytes(), &[0, 0, 0, 123]);
1329        let decimal = Decimal::from_i64(123, 5, 2);
1330        assert_eq!(decimal.as_bytes(), &[0, 0, 0, 0, 0, 0, 0, 123]);
1331        let decimal = Decimal::from_bytes(ByteArray::from(vec![1, 2, 3]), 5, 2);
1332        assert_eq!(decimal.as_bytes(), &[1, 2, 3]);
1333    }
1334
1335    #[test]
1336    fn test_int96_from() {
1337        assert_eq!(
1338            Int96::from(vec![1, 12345, 1234567890]).data(),
1339            &[1, 12345, 1234567890]
1340        );
1341    }
1342
1343    #[test]
1344    fn test_byte_array_from() {
1345        assert_eq!(ByteArray::from(b"ABC".to_vec()).data(), b"ABC");
1346        assert_eq!(ByteArray::from("ABC").data(), b"ABC");
1347        assert_eq!(
1348            ByteArray::from(Bytes::from(vec![1u8, 2u8, 3u8, 4u8, 5u8])).data(),
1349            &[1u8, 2u8, 3u8, 4u8, 5u8]
1350        );
1351        let buf = vec![6u8, 7u8, 8u8, 9u8, 10u8];
1352        assert_eq!(ByteArray::from(buf).data(), &[6u8, 7u8, 8u8, 9u8, 10u8]);
1353    }
1354
1355    #[test]
1356    fn test_decimal_partial_eq() {
1357        assert_eq!(Decimal::default(), Decimal::from_i32(0, 0, 0));
1358        assert_eq!(Decimal::from_i32(222, 5, 2), Decimal::from_i32(222, 5, 2));
1359        assert_eq!(
1360            Decimal::from_bytes(ByteArray::from(vec![0, 0, 0, 3]), 5, 2),
1361            Decimal::from_i32(3, 5, 2)
1362        );
1363
1364        assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(111, 5, 2));
1365        assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 6, 2));
1366        assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 5, 3));
1367
1368        assert!(Decimal::from_i64(222, 5, 2) != Decimal::from_i32(222, 5, 2));
1369    }
1370
1371    #[test]
1372    fn test_byte_array_ord() {
1373        let ba1 = ByteArray::from(vec![1, 2, 3]);
1374        let ba11 = ByteArray::from(vec![1, 2, 3]);
1375        let ba2 = ByteArray::from(vec![3, 4]);
1376        let ba3 = ByteArray::from(vec![1, 2, 4]);
1377        let ba4 = ByteArray::from(vec![]);
1378        let ba5 = ByteArray::from(vec![2, 2, 3]);
1379
1380        assert!(ba1 < ba2);
1381        assert!(ba3 > ba1);
1382        assert!(ba1 > ba4);
1383        assert_eq!(ba1, ba11);
1384        assert!(ba5 > ba1);
1385    }
1386}