parquet/
data_type.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Data types that connect Parquet physical types with their Rust-specific
19//! representations.
20use bytes::Bytes;
21use half::f16;
22use std::cmp::Ordering;
23use std::fmt;
24use std::mem;
25use std::ops::{Deref, DerefMut};
26use std::str::from_utf8;
27
28use crate::basic::Type;
29use crate::column::reader::{ColumnReader, ColumnReaderImpl};
30use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
31use crate::errors::{ParquetError, Result};
32use crate::util::bit_util::FromBytes;
33
34/// Rust representation for logical type INT96, value is backed by an array of `u32`.
35/// The type only takes 12 bytes, without extra padding.
36#[derive(Clone, Copy, Debug, PartialOrd, Default, PartialEq, Eq)]
37pub struct Int96 {
38    value: [u32; 3],
39}
40
41const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
42
43/// Number of seconds in a day
44const SECONDS_IN_DAY: i64 = 86_400;
45/// Number of milliseconds in a second
46const MILLISECONDS: i64 = 1_000;
47/// Number of microseconds in a second
48const MICROSECONDS: i64 = 1_000_000;
49/// Number of nanoseconds in a second
50const NANOSECONDS: i64 = 1_000_000_000;
51
52/// Number of milliseconds in a day
53const MILLISECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MILLISECONDS;
54/// Number of microseconds in a day
55const MICROSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MICROSECONDS;
56/// Number of nanoseconds in a day
57const NANOSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * NANOSECONDS;
58
59impl Int96 {
60    /// Creates new INT96 type struct with no data set.
61    pub fn new() -> Self {
62        Self { value: [0; 3] }
63    }
64
65    /// Returns underlying data as slice of [`u32`].
66    #[inline]
67    pub fn data(&self) -> &[u32] {
68        &self.value
69    }
70
71    /// Sets data for this INT96 type.
72    #[inline]
73    pub fn set_data(&mut self, elem0: u32, elem1: u32, elem2: u32) {
74        self.value = [elem0, elem1, elem2];
75    }
76
77    /// Converts this INT96 into an i64 representing the number of MILLISECONDS since Epoch
78    #[deprecated(since = "54.0.0", note = "Use `to_millis` instead")]
79    pub fn to_i64(&self) -> i64 {
80        self.to_millis()
81    }
82
83    /// Converts this INT96 into an i64 representing the number of SECONDS since EPOCH
84    ///
85    /// Will wrap around on overflow
86    #[inline]
87    pub fn to_seconds(&self) -> i64 {
88        let (day, nanos) = self.data_as_days_and_nanos();
89        (day as i64 - JULIAN_DAY_OF_EPOCH)
90            .wrapping_mul(SECONDS_IN_DAY)
91            .wrapping_add(nanos / 1_000_000_000)
92    }
93
94    /// Converts this INT96 into an i64 representing the number of MILLISECONDS since EPOCH
95    ///
96    /// Will wrap around on overflow
97    #[inline]
98    pub fn to_millis(&self) -> i64 {
99        let (day, nanos) = self.data_as_days_and_nanos();
100        (day as i64 - JULIAN_DAY_OF_EPOCH)
101            .wrapping_mul(MILLISECONDS_IN_DAY)
102            .wrapping_add(nanos / 1_000_000)
103    }
104
105    /// Converts this INT96 into an i64 representing the number of MICROSECONDS since EPOCH
106    ///
107    /// Will wrap around on overflow
108    #[inline]
109    pub fn to_micros(&self) -> i64 {
110        let (day, nanos) = self.data_as_days_and_nanos();
111        (day as i64 - JULIAN_DAY_OF_EPOCH)
112            .wrapping_mul(MICROSECONDS_IN_DAY)
113            .wrapping_add(nanos / 1_000)
114    }
115
116    /// Converts this INT96 into an i64 representing the number of NANOSECONDS since EPOCH
117    ///
118    /// Will wrap around on overflow
119    #[inline]
120    pub fn to_nanos(&self) -> i64 {
121        let (day, nanos) = self.data_as_days_and_nanos();
122        (day as i64 - JULIAN_DAY_OF_EPOCH)
123            .wrapping_mul(NANOSECONDS_IN_DAY)
124            .wrapping_add(nanos)
125    }
126
127    #[inline]
128    fn data_as_days_and_nanos(&self) -> (i32, i64) {
129        let day = self.data()[2] as i32;
130        let nanos = ((self.data()[1] as i64) << 32) + self.data()[0] as i64;
131        (day, nanos)
132    }
133}
134
135impl From<Vec<u32>> for Int96 {
136    fn from(buf: Vec<u32>) -> Self {
137        assert_eq!(buf.len(), 3);
138        let mut result = Self::new();
139        result.set_data(buf[0], buf[1], buf[2]);
140        result
141    }
142}
143
144impl fmt::Display for Int96 {
145    #[cold]
146    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
147        write!(f, "{:?}", self.data())
148    }
149}
150
151/// Rust representation for BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY Parquet physical types.
152/// Value is backed by a byte buffer.
153#[derive(Clone, Default)]
154pub struct ByteArray {
155    data: Option<Bytes>,
156}
157
158// Special case Debug that prints out byte arrays that are valid utf8 as &str's
159impl std::fmt::Debug for ByteArray {
160    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161        let mut debug_struct = f.debug_struct("ByteArray");
162        match self.as_utf8() {
163            Ok(s) => debug_struct.field("data", &s),
164            Err(_) => debug_struct.field("data", &self.data),
165        };
166        debug_struct.finish()
167    }
168}
169
170impl PartialOrd for ByteArray {
171    fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
172        // sort nulls first (consistent with PartialCmp on Option)
173        //
174        // Since ByteBuffer doesn't implement PartialOrd, so can't
175        // derive an implementation
176        match (&self.data, &other.data) {
177            (None, None) => Some(Ordering::Equal),
178            (None, Some(_)) => Some(Ordering::Less),
179            (Some(_), None) => Some(Ordering::Greater),
180            (Some(self_data), Some(other_data)) => {
181                // compare slices directly
182                self_data.partial_cmp(&other_data)
183            }
184        }
185    }
186}
187
188impl ByteArray {
189    /// Creates new byte array with no data set.
190    #[inline]
191    pub fn new() -> Self {
192        ByteArray { data: None }
193    }
194
195    /// Gets length of the underlying byte buffer.
196    #[inline]
197    pub fn len(&self) -> usize {
198        assert!(self.data.is_some());
199        self.data.as_ref().unwrap().len()
200    }
201
202    /// Checks if the underlying buffer is empty.
203    #[inline]
204    pub fn is_empty(&self) -> bool {
205        self.len() == 0
206    }
207
208    /// Returns slice of data.
209    #[inline]
210    pub fn data(&self) -> &[u8] {
211        self.data
212            .as_ref()
213            .expect("set_data should have been called")
214            .as_ref()
215    }
216
217    /// Set data from another byte buffer.
218    #[inline]
219    pub fn set_data(&mut self, data: Bytes) {
220        self.data = Some(data);
221    }
222
223    /// Returns `ByteArray` instance with slice of values for a data.
224    #[inline]
225    pub fn slice(&self, start: usize, len: usize) -> Self {
226        Self::from(
227            self.data
228                .as_ref()
229                .expect("set_data should have been called")
230                .slice(start..start + len),
231        )
232    }
233
234    /// Try to convert the byte array to a utf8 slice
235    pub fn as_utf8(&self) -> Result<&str> {
236        self.data
237            .as_ref()
238            .map(|ptr| ptr.as_ref())
239            .ok_or_else(|| general_err!("Can't convert empty byte array to utf8"))
240            .and_then(|bytes| from_utf8(bytes).map_err(|e| e.into()))
241    }
242}
243
244impl From<Vec<u8>> for ByteArray {
245    fn from(buf: Vec<u8>) -> ByteArray {
246        Self {
247            data: Some(buf.into()),
248        }
249    }
250}
251
252impl<'a> From<&'a [u8]> for ByteArray {
253    fn from(b: &'a [u8]) -> ByteArray {
254        let mut v = Vec::new();
255        v.extend_from_slice(b);
256        Self {
257            data: Some(v.into()),
258        }
259    }
260}
261
262impl<'a> From<&'a str> for ByteArray {
263    fn from(s: &'a str) -> ByteArray {
264        let mut v = Vec::new();
265        v.extend_from_slice(s.as_bytes());
266        Self {
267            data: Some(v.into()),
268        }
269    }
270}
271
272impl From<Bytes> for ByteArray {
273    fn from(value: Bytes) -> Self {
274        Self { data: Some(value) }
275    }
276}
277
278impl From<f16> for ByteArray {
279    fn from(value: f16) -> Self {
280        Self::from(value.to_le_bytes().as_slice())
281    }
282}
283
284impl PartialEq for ByteArray {
285    fn eq(&self, other: &ByteArray) -> bool {
286        match (&self.data, &other.data) {
287            (Some(d1), Some(d2)) => d1.as_ref() == d2.as_ref(),
288            (None, None) => true,
289            _ => false,
290        }
291    }
292}
293
294impl fmt::Display for ByteArray {
295    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296        write!(f, "{:?}", self.data())
297    }
298}
299
300/// Wrapper type for performance reasons, this represents `FIXED_LEN_BYTE_ARRAY` but in all other
301/// considerations behaves the same as `ByteArray`
302///
303/// # Performance notes:
304/// This type is a little unfortunate, without it the compiler generates code that takes quite a
305/// big hit on the CPU pipeline. Essentially the previous version stalls awaiting the result of
306/// `T::get_physical_type() == Type::FIXED_LEN_BYTE_ARRAY`.
307///
308/// Its debatable if this is wanted, it is out of spec for what parquet documents as its base
309/// types, although there are code paths in the Rust (and potentially the C++) versions that
310/// warrant this.
311///
312/// With this wrapper type the compiler generates more targeted code paths matching the higher
313/// level logical types, removing the data-hazard from all decoding and encoding paths.
314#[repr(transparent)]
315#[derive(Clone, Debug, Default)]
316pub struct FixedLenByteArray(ByteArray);
317
318impl PartialEq for FixedLenByteArray {
319    fn eq(&self, other: &FixedLenByteArray) -> bool {
320        self.0.eq(&other.0)
321    }
322}
323
324impl PartialEq<ByteArray> for FixedLenByteArray {
325    fn eq(&self, other: &ByteArray) -> bool {
326        self.0.eq(other)
327    }
328}
329
330impl PartialEq<FixedLenByteArray> for ByteArray {
331    fn eq(&self, other: &FixedLenByteArray) -> bool {
332        self.eq(&other.0)
333    }
334}
335
336impl fmt::Display for FixedLenByteArray {
337    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
338        self.0.fmt(f)
339    }
340}
341
342impl PartialOrd for FixedLenByteArray {
343    fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> {
344        self.0.partial_cmp(&other.0)
345    }
346}
347
348impl PartialOrd<FixedLenByteArray> for ByteArray {
349    fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> {
350        self.partial_cmp(&other.0)
351    }
352}
353
354impl PartialOrd<ByteArray> for FixedLenByteArray {
355    fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
356        self.0.partial_cmp(other)
357    }
358}
359
360impl Deref for FixedLenByteArray {
361    type Target = ByteArray;
362
363    fn deref(&self) -> &Self::Target {
364        &self.0
365    }
366}
367
368impl DerefMut for FixedLenByteArray {
369    fn deref_mut(&mut self) -> &mut Self::Target {
370        &mut self.0
371    }
372}
373
374impl From<ByteArray> for FixedLenByteArray {
375    fn from(other: ByteArray) -> Self {
376        Self(other)
377    }
378}
379
380impl From<Vec<u8>> for FixedLenByteArray {
381    fn from(buf: Vec<u8>) -> FixedLenByteArray {
382        FixedLenByteArray(ByteArray::from(buf))
383    }
384}
385
386impl From<FixedLenByteArray> for ByteArray {
387    fn from(other: FixedLenByteArray) -> Self {
388        other.0
389    }
390}
391
392/// Rust representation for Decimal values.
393///
394/// This is not a representation of Parquet physical type, but rather a wrapper for
395/// DECIMAL logical type, and serves as container for raw parts of decimal values:
396/// unscaled value in bytes, precision and scale.
397#[derive(Clone, Debug)]
398pub enum Decimal {
399    /// Decimal backed by `i32`.
400    Int32 {
401        /// The underlying value
402        value: [u8; 4],
403        /// The total number of digits in the number
404        precision: i32,
405        /// The number of digits to the right of the decimal point
406        scale: i32,
407    },
408    /// Decimal backed by `i64`.
409    Int64 {
410        /// The underlying value
411        value: [u8; 8],
412        /// The total number of digits in the number
413        precision: i32,
414        /// The number of digits to the right of the decimal point
415        scale: i32,
416    },
417    /// Decimal backed by byte array.
418    Bytes {
419        /// The underlying value
420        value: ByteArray,
421        /// The total number of digits in the number
422        precision: i32,
423        /// The number of digits to the right of the decimal point
424        scale: i32,
425    },
426}
427
428impl Decimal {
429    /// Creates new decimal value from `i32`.
430    pub fn from_i32(value: i32, precision: i32, scale: i32) -> Self {
431        let bytes = value.to_be_bytes();
432        Decimal::Int32 {
433            value: bytes,
434            precision,
435            scale,
436        }
437    }
438
439    /// Creates new decimal value from `i64`.
440    pub fn from_i64(value: i64, precision: i32, scale: i32) -> Self {
441        let bytes = value.to_be_bytes();
442        Decimal::Int64 {
443            value: bytes,
444            precision,
445            scale,
446        }
447    }
448
449    /// Creates new decimal value from `ByteArray`.
450    pub fn from_bytes(value: ByteArray, precision: i32, scale: i32) -> Self {
451        Decimal::Bytes {
452            value,
453            precision,
454            scale,
455        }
456    }
457
458    /// Returns bytes of unscaled value.
459    pub fn data(&self) -> &[u8] {
460        match *self {
461            Decimal::Int32 { ref value, .. } => value,
462            Decimal::Int64 { ref value, .. } => value,
463            Decimal::Bytes { ref value, .. } => value.data(),
464        }
465    }
466
467    /// Returns decimal precision.
468    pub fn precision(&self) -> i32 {
469        match *self {
470            Decimal::Int32 { precision, .. } => precision,
471            Decimal::Int64 { precision, .. } => precision,
472            Decimal::Bytes { precision, .. } => precision,
473        }
474    }
475
476    /// Returns decimal scale.
477    pub fn scale(&self) -> i32 {
478        match *self {
479            Decimal::Int32 { scale, .. } => scale,
480            Decimal::Int64 { scale, .. } => scale,
481            Decimal::Bytes { scale, .. } => scale,
482        }
483    }
484}
485
486impl Default for Decimal {
487    fn default() -> Self {
488        Self::from_i32(0, 0, 0)
489    }
490}
491
492impl PartialEq for Decimal {
493    fn eq(&self, other: &Decimal) -> bool {
494        self.precision() == other.precision()
495            && self.scale() == other.scale()
496            && self.data() == other.data()
497    }
498}
499
500/// Converts an instance of data type to a slice of bytes as `u8`.
501pub trait AsBytes {
502    /// Returns slice of bytes for this data type.
503    fn as_bytes(&self) -> &[u8];
504}
505
506/// Converts an slice of a data type to a slice of bytes.
507pub trait SliceAsBytes: Sized {
508    /// Returns slice of bytes for a slice of this data type.
509    fn slice_as_bytes(self_: &[Self]) -> &[u8];
510    /// Return the internal representation as a mutable slice
511    ///
512    /// # Safety
513    /// If modified you are _required_ to ensure the internal representation
514    /// is valid and correct for the actual raw data
515    unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8];
516}
517
518impl AsBytes for [u8] {
519    fn as_bytes(&self) -> &[u8] {
520        self
521    }
522}
523
524macro_rules! gen_as_bytes {
525    ($source_ty:ident) => {
526        impl AsBytes for $source_ty {
527            #[allow(clippy::size_of_in_element_count)]
528            fn as_bytes(&self) -> &[u8] {
529                // SAFETY: macro is only used with primitive types that have no padding, so the
530                // resulting slice always refers to initialized memory.
531                unsafe {
532                    std::slice::from_raw_parts(
533                        self as *const $source_ty as *const u8,
534                        std::mem::size_of::<$source_ty>(),
535                    )
536                }
537            }
538        }
539
540        impl SliceAsBytes for $source_ty {
541            #[inline]
542            #[allow(clippy::size_of_in_element_count)]
543            fn slice_as_bytes(self_: &[Self]) -> &[u8] {
544                // SAFETY: macro is only used with primitive types that have no padding, so the
545                // resulting slice always refers to initialized memory.
546                unsafe {
547                    std::slice::from_raw_parts(
548                        self_.as_ptr() as *const u8,
549                        std::mem::size_of_val(self_),
550                    )
551                }
552            }
553
554            #[inline]
555            #[allow(clippy::size_of_in_element_count)]
556            unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8] {
557                // SAFETY: macro is only used with primitive types that have no padding, so the
558                // resulting slice always refers to initialized memory. Moreover, self has no
559                // invalid bit patterns, so all writes to the resulting slice will be valid.
560                unsafe {
561                    std::slice::from_raw_parts_mut(
562                        self_.as_mut_ptr() as *mut u8,
563                        std::mem::size_of_val(self_),
564                    )
565                }
566            }
567        }
568    };
569}
570
571gen_as_bytes!(i8);
572gen_as_bytes!(i16);
573gen_as_bytes!(i32);
574gen_as_bytes!(i64);
575gen_as_bytes!(u8);
576gen_as_bytes!(u16);
577gen_as_bytes!(u32);
578gen_as_bytes!(u64);
579gen_as_bytes!(f32);
580gen_as_bytes!(f64);
581
582macro_rules! unimplemented_slice_as_bytes {
583    ($ty: ty) => {
584        impl SliceAsBytes for $ty {
585            fn slice_as_bytes(_self: &[Self]) -> &[u8] {
586                unimplemented!()
587            }
588
589            unsafe fn slice_as_bytes_mut(_self: &mut [Self]) -> &mut [u8] {
590                unimplemented!()
591            }
592        }
593    };
594}
595
596// TODO - Can Int96 and bool be implemented in these terms?
597unimplemented_slice_as_bytes!(Int96);
598unimplemented_slice_as_bytes!(bool);
599unimplemented_slice_as_bytes!(ByteArray);
600unimplemented_slice_as_bytes!(FixedLenByteArray);
601
602impl AsBytes for bool {
603    fn as_bytes(&self) -> &[u8] {
604        // SAFETY: a bool is guaranteed to be either 0x00 or 0x01 in memory, so the memory is
605        // valid.
606        unsafe { std::slice::from_raw_parts(self as *const bool as *const u8, 1) }
607    }
608}
609
610impl AsBytes for Int96 {
611    fn as_bytes(&self) -> &[u8] {
612        // SAFETY: Int96::data is a &[u32; 3].
613        unsafe { std::slice::from_raw_parts(self.data() as *const [u32] as *const u8, 12) }
614    }
615}
616
617impl AsBytes for ByteArray {
618    fn as_bytes(&self) -> &[u8] {
619        self.data()
620    }
621}
622
623impl AsBytes for FixedLenByteArray {
624    fn as_bytes(&self) -> &[u8] {
625        self.data()
626    }
627}
628
629impl AsBytes for Decimal {
630    fn as_bytes(&self) -> &[u8] {
631        self.data()
632    }
633}
634
635impl AsBytes for Vec<u8> {
636    fn as_bytes(&self) -> &[u8] {
637        self.as_slice()
638    }
639}
640
641impl AsBytes for &str {
642    fn as_bytes(&self) -> &[u8] {
643        (self as &str).as_bytes()
644    }
645}
646
647impl AsBytes for str {
648    fn as_bytes(&self) -> &[u8] {
649        (self as &str).as_bytes()
650    }
651}
652
653pub(crate) mod private {
654    use bytes::Bytes;
655
656    use crate::encodings::decoding::PlainDecoderDetails;
657    use crate::util::bit_util::{read_num_bytes, BitReader, BitWriter};
658
659    use super::{ParquetError, Result, SliceAsBytes};
660    use crate::basic::Type;
661    use crate::file::metadata::HeapSize;
662
663    /// Sealed trait to start to remove specialisation from implementations
664    ///
665    /// This is done to force the associated value type to be unimplementable outside of this
666    /// crate, and thus hint to the type system (and end user) traits are public for the contract
667    /// and not for extension.
668    pub trait ParquetValueType:
669        PartialEq
670        + std::fmt::Debug
671        + std::fmt::Display
672        + Default
673        + Clone
674        + super::AsBytes
675        + super::FromBytes
676        + SliceAsBytes
677        + PartialOrd
678        + Send
679        + HeapSize
680        + crate::encodings::decoding::private::GetDecoder
681        + crate::file::statistics::private::MakeStatistics
682    {
683        const PHYSICAL_TYPE: Type;
684
685        /// Encode the value directly from a higher level encoder
686        fn encode<W: std::io::Write>(
687            values: &[Self],
688            writer: &mut W,
689            bit_writer: &mut BitWriter,
690        ) -> Result<()>;
691
692        /// Establish the data that will be decoded in a buffer
693        fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize);
694
695        /// Decode the value from a given buffer for a higher level decoder
696        fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize>;
697
698        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize>;
699
700        /// Return the encoded size for a type
701        fn dict_encoding_size(&self) -> (usize, usize) {
702            (std::mem::size_of::<Self>(), 1)
703        }
704
705        /// Return the number of variable length bytes in a given slice of data
706        ///
707        /// Returns the sum of lengths for BYTE_ARRAY data, and None for all other data types
708        fn variable_length_bytes(_: &[Self]) -> Option<i64> {
709            None
710        }
711
712        /// Return the value as i64 if possible
713        ///
714        /// This is essentially the same as `std::convert::TryInto<i64>` but can't be
715        /// implemented for `f32` and `f64`, types that would fail orphan rules
716        fn as_i64(&self) -> Result<i64> {
717            Err(general_err!("Type cannot be converted to i64"))
718        }
719
720        /// Return the value as u64 if possible
721        ///
722        /// This is essentially the same as `std::convert::TryInto<u64>` but can't be
723        /// implemented for `f32` and `f64`, types that would fail orphan rules
724        fn as_u64(&self) -> Result<u64> {
725            self.as_i64()
726                .map_err(|_| general_err!("Type cannot be converted to u64"))
727                .map(|x| x as u64)
728        }
729
730        /// Return the value as an Any to allow for downcasts without transmutation
731        fn as_any(&self) -> &dyn std::any::Any;
732
733        /// Return the value as an mutable Any to allow for downcasts without transmutation
734        fn as_mut_any(&mut self) -> &mut dyn std::any::Any;
735
736        /// Sets the value of this object from the provided [`Bytes`]
737        ///
738        /// Only implemented for `ByteArray` and `FixedLenByteArray`. Will panic for other types.
739        fn set_from_bytes(&mut self, _data: Bytes) {
740            unimplemented!();
741        }
742    }
743
744    impl ParquetValueType for bool {
745        const PHYSICAL_TYPE: Type = Type::BOOLEAN;
746
747        #[inline]
748        fn encode<W: std::io::Write>(
749            values: &[Self],
750            _: &mut W,
751            bit_writer: &mut BitWriter,
752        ) -> Result<()> {
753            for value in values {
754                bit_writer.put_value(*value as u64, 1)
755            }
756            Ok(())
757        }
758
759        #[inline]
760        fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
761            decoder.bit_reader.replace(BitReader::new(data));
762            decoder.num_values = num_values;
763        }
764
765        #[inline]
766        fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
767            let bit_reader = decoder.bit_reader.as_mut().unwrap();
768            let num_values = std::cmp::min(buffer.len(), decoder.num_values);
769            let values_read = bit_reader.get_batch(&mut buffer[..num_values], 1);
770            decoder.num_values -= values_read;
771            Ok(values_read)
772        }
773
774        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
775            let bit_reader = decoder.bit_reader.as_mut().unwrap();
776            let num_values = std::cmp::min(num_values, decoder.num_values);
777            let values_read = bit_reader.skip(num_values, 1);
778            decoder.num_values -= values_read;
779            Ok(values_read)
780        }
781
782        #[inline]
783        fn as_i64(&self) -> Result<i64> {
784            Ok(*self as i64)
785        }
786
787        #[inline]
788        fn as_any(&self) -> &dyn std::any::Any {
789            self
790        }
791
792        #[inline]
793        fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
794            self
795        }
796    }
797
798    macro_rules! impl_from_raw {
799        ($ty: ty, $physical_ty: expr, $self: ident => $as_i64: block) => {
800            impl ParquetValueType for $ty {
801                const PHYSICAL_TYPE: Type = $physical_ty;
802
803                #[inline]
804                fn encode<W: std::io::Write>(values: &[Self], writer: &mut W, _: &mut BitWriter) -> Result<()> {
805                    // SAFETY: Self is one of i32, i64, f32, f64, which have no padding.
806                    let raw = unsafe {
807                        std::slice::from_raw_parts(
808                            values.as_ptr() as *const u8,
809                            std::mem::size_of_val(values),
810                        )
811                    };
812                    writer.write_all(raw)?;
813
814                    Ok(())
815                }
816
817                #[inline]
818                fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
819                    decoder.data.replace(data);
820                    decoder.start = 0;
821                    decoder.num_values = num_values;
822                }
823
824                #[inline]
825                fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
826                    let data = decoder.data.as_ref().expect("set_data should have been called");
827                    let num_values = std::cmp::min(buffer.len(), decoder.num_values);
828                    let bytes_left = data.len() - decoder.start;
829                    let bytes_to_decode = std::mem::size_of::<Self>() * num_values;
830
831                    if bytes_left < bytes_to_decode {
832                        return Err(eof_err!("Not enough bytes to decode"));
833                    }
834
835                    {
836                        // SAFETY: Self has no invalid bit patterns, so writing to the slice
837                        // obtained with slice_as_bytes_mut is always safe.
838                        let raw_buffer = &mut unsafe { Self::slice_as_bytes_mut(buffer) }[..bytes_to_decode];
839                        raw_buffer.copy_from_slice(data.slice(
840                            decoder.start..decoder.start + bytes_to_decode
841                        ).as_ref());
842                    };
843                    decoder.start += bytes_to_decode;
844                    decoder.num_values -= num_values;
845
846                    Ok(num_values)
847                }
848
849                #[inline]
850                fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
851                    let data = decoder.data.as_ref().expect("set_data should have been called");
852                    let num_values = num_values.min(decoder.num_values);
853                    let bytes_left = data.len() - decoder.start;
854                    let bytes_to_skip = std::mem::size_of::<Self>() * num_values;
855
856                    if bytes_left < bytes_to_skip {
857                        return Err(eof_err!("Not enough bytes to skip"));
858                    }
859
860                    decoder.start += bytes_to_skip;
861                    decoder.num_values -= num_values;
862
863                    Ok(num_values)
864                }
865
866                #[inline]
867                fn as_i64(&$self) -> Result<i64> {
868                    $as_i64
869                }
870
871                #[inline]
872                fn as_any(&self) -> &dyn std::any::Any {
873                    self
874                }
875
876                #[inline]
877                fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
878                    self
879                }
880            }
881        }
882    }
883
884    impl_from_raw!(i32, Type::INT32, self => { Ok(*self as i64) });
885    impl_from_raw!(i64, Type::INT64, self => { Ok(*self) });
886    impl_from_raw!(f32, Type::FLOAT, self => { Err(general_err!("Type cannot be converted to i64")) });
887    impl_from_raw!(f64, Type::DOUBLE, self => { Err(general_err!("Type cannot be converted to i64")) });
888
889    impl ParquetValueType for super::Int96 {
890        const PHYSICAL_TYPE: Type = Type::INT96;
891
892        #[inline]
893        fn encode<W: std::io::Write>(
894            values: &[Self],
895            writer: &mut W,
896            _: &mut BitWriter,
897        ) -> Result<()> {
898            for value in values {
899                let raw = SliceAsBytes::slice_as_bytes(value.data());
900                writer.write_all(raw)?;
901            }
902            Ok(())
903        }
904
905        #[inline]
906        fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
907            decoder.data.replace(data);
908            decoder.start = 0;
909            decoder.num_values = num_values;
910        }
911
912        #[inline]
913        fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
914            // TODO - Remove the duplication between this and the general slice method
915            let data = decoder
916                .data
917                .as_ref()
918                .expect("set_data should have been called");
919            let num_values = std::cmp::min(buffer.len(), decoder.num_values);
920            let bytes_left = data.len() - decoder.start;
921            let bytes_to_decode = 12 * num_values;
922
923            if bytes_left < bytes_to_decode {
924                return Err(eof_err!("Not enough bytes to decode"));
925            }
926
927            let data_range = data.slice(decoder.start..decoder.start + bytes_to_decode);
928            let bytes: &[u8] = &data_range;
929            decoder.start += bytes_to_decode;
930
931            let mut pos = 0; // position in byte array
932            for item in buffer.iter_mut().take(num_values) {
933                let elem0 = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap());
934                let elem1 = u32::from_le_bytes(bytes[pos + 4..pos + 8].try_into().unwrap());
935                let elem2 = u32::from_le_bytes(bytes[pos + 8..pos + 12].try_into().unwrap());
936
937                item.set_data(elem0, elem1, elem2);
938                pos += 12;
939            }
940            decoder.num_values -= num_values;
941
942            Ok(num_values)
943        }
944
945        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
946            let data = decoder
947                .data
948                .as_ref()
949                .expect("set_data should have been called");
950            let num_values = std::cmp::min(num_values, decoder.num_values);
951            let bytes_left = data.len() - decoder.start;
952            let bytes_to_skip = 12 * num_values;
953
954            if bytes_left < bytes_to_skip {
955                return Err(eof_err!("Not enough bytes to skip"));
956            }
957            decoder.start += bytes_to_skip;
958            decoder.num_values -= num_values;
959
960            Ok(num_values)
961        }
962
963        #[inline]
964        fn as_any(&self) -> &dyn std::any::Any {
965            self
966        }
967
968        #[inline]
969        fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
970            self
971        }
972    }
973
974    impl HeapSize for super::Int96 {
975        fn heap_size(&self) -> usize {
976            0 // no heap allocations
977        }
978    }
979
980    impl ParquetValueType for super::ByteArray {
981        const PHYSICAL_TYPE: Type = Type::BYTE_ARRAY;
982
983        #[inline]
984        fn encode<W: std::io::Write>(
985            values: &[Self],
986            writer: &mut W,
987            _: &mut BitWriter,
988        ) -> Result<()> {
989            for value in values {
990                let len: u32 = value.len().try_into().unwrap();
991                writer.write_all(&len.to_ne_bytes())?;
992                let raw = value.data();
993                writer.write_all(raw)?;
994            }
995            Ok(())
996        }
997
998        #[inline]
999        fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
1000            decoder.data.replace(data);
1001            decoder.start = 0;
1002            decoder.num_values = num_values;
1003        }
1004
1005        #[inline]
1006        fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
1007            let data = decoder
1008                .data
1009                .as_mut()
1010                .expect("set_data should have been called");
1011            let num_values = std::cmp::min(buffer.len(), decoder.num_values);
1012            for val_array in buffer.iter_mut().take(num_values) {
1013                let len: usize =
1014                    read_num_bytes::<u32>(4, data.slice(decoder.start..).as_ref()) as usize;
1015                decoder.start += std::mem::size_of::<u32>();
1016
1017                if data.len() < decoder.start + len {
1018                    return Err(eof_err!("Not enough bytes to decode"));
1019                }
1020
1021                val_array.set_data(data.slice(decoder.start..decoder.start + len));
1022                decoder.start += len;
1023            }
1024            decoder.num_values -= num_values;
1025
1026            Ok(num_values)
1027        }
1028
1029        fn variable_length_bytes(values: &[Self]) -> Option<i64> {
1030            Some(values.iter().map(|x| x.len() as i64).sum())
1031        }
1032
1033        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
1034            let data = decoder
1035                .data
1036                .as_mut()
1037                .expect("set_data should have been called");
1038            let num_values = num_values.min(decoder.num_values);
1039
1040            for _ in 0..num_values {
1041                let len: usize =
1042                    read_num_bytes::<u32>(4, data.slice(decoder.start..).as_ref()) as usize;
1043                decoder.start += std::mem::size_of::<u32>() + len;
1044            }
1045            decoder.num_values -= num_values;
1046
1047            Ok(num_values)
1048        }
1049
1050        #[inline]
1051        fn dict_encoding_size(&self) -> (usize, usize) {
1052            (std::mem::size_of::<u32>(), self.len())
1053        }
1054
1055        #[inline]
1056        fn as_any(&self) -> &dyn std::any::Any {
1057            self
1058        }
1059
1060        #[inline]
1061        fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
1062            self
1063        }
1064
1065        #[inline]
1066        fn set_from_bytes(&mut self, data: Bytes) {
1067            self.set_data(data);
1068        }
1069    }
1070
1071    impl HeapSize for super::ByteArray {
1072        fn heap_size(&self) -> usize {
1073            // note: this is an estimate, not exact, so just return the size
1074            // of the actual data used, don't try to handle the fact that it may
1075            // be shared.
1076            self.data.as_ref().map(|data| data.len()).unwrap_or(0)
1077        }
1078    }
1079
1080    impl ParquetValueType for super::FixedLenByteArray {
1081        const PHYSICAL_TYPE: Type = Type::FIXED_LEN_BYTE_ARRAY;
1082
1083        #[inline]
1084        fn encode<W: std::io::Write>(
1085            values: &[Self],
1086            writer: &mut W,
1087            _: &mut BitWriter,
1088        ) -> Result<()> {
1089            for value in values {
1090                let raw = value.data();
1091                writer.write_all(raw)?;
1092            }
1093            Ok(())
1094        }
1095
1096        #[inline]
1097        fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
1098            decoder.data.replace(data);
1099            decoder.start = 0;
1100            decoder.num_values = num_values;
1101        }
1102
1103        #[inline]
1104        fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
1105            assert!(decoder.type_length > 0);
1106
1107            let data = decoder
1108                .data
1109                .as_mut()
1110                .expect("set_data should have been called");
1111            let num_values = std::cmp::min(buffer.len(), decoder.num_values);
1112
1113            for item in buffer.iter_mut().take(num_values) {
1114                let len = decoder.type_length as usize;
1115
1116                if data.len() < decoder.start + len {
1117                    return Err(eof_err!("Not enough bytes to decode"));
1118                }
1119
1120                item.set_data(data.slice(decoder.start..decoder.start + len));
1121                decoder.start += len;
1122            }
1123            decoder.num_values -= num_values;
1124
1125            Ok(num_values)
1126        }
1127
1128        fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
1129            assert!(decoder.type_length > 0);
1130
1131            let data = decoder
1132                .data
1133                .as_mut()
1134                .expect("set_data should have been called");
1135            let num_values = std::cmp::min(num_values, decoder.num_values);
1136            for _ in 0..num_values {
1137                let len = decoder.type_length as usize;
1138
1139                if data.len() < decoder.start + len {
1140                    return Err(eof_err!("Not enough bytes to skip"));
1141                }
1142
1143                decoder.start += len;
1144            }
1145            decoder.num_values -= num_values;
1146
1147            Ok(num_values)
1148        }
1149
1150        #[inline]
1151        fn dict_encoding_size(&self) -> (usize, usize) {
1152            (std::mem::size_of::<u32>(), self.len())
1153        }
1154
1155        #[inline]
1156        fn as_any(&self) -> &dyn std::any::Any {
1157            self
1158        }
1159
1160        #[inline]
1161        fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
1162            self
1163        }
1164
1165        #[inline]
1166        fn set_from_bytes(&mut self, data: Bytes) {
1167            self.set_data(data);
1168        }
1169    }
1170
1171    impl HeapSize for super::FixedLenByteArray {
1172        fn heap_size(&self) -> usize {
1173            self.0.heap_size()
1174        }
1175    }
1176}
1177
1178/// Contains the Parquet physical type information as well as the Rust primitive type
1179/// presentation.
1180pub trait DataType: 'static + Send {
1181    /// The physical type of the Parquet data type.
1182    type T: private::ParquetValueType;
1183
1184    /// Returns Parquet physical type.
1185    fn get_physical_type() -> Type {
1186        <Self::T as private::ParquetValueType>::PHYSICAL_TYPE
1187    }
1188
1189    /// Returns size in bytes for Rust representation of the physical type.
1190    fn get_type_size() -> usize;
1191
1192    /// Returns the underlying [`ColumnReaderImpl`] for the given [`ColumnReader`].
1193    fn get_column_reader(column_writer: ColumnReader) -> Option<ColumnReaderImpl<Self>>
1194    where
1195        Self: Sized;
1196
1197    /// Returns the underlying [`ColumnWriterImpl`] for the given [`ColumnWriter`].
1198    fn get_column_writer(column_writer: ColumnWriter<'_>) -> Option<ColumnWriterImpl<'_, Self>>
1199    where
1200        Self: Sized;
1201
1202    /// Returns a reference to the underlying [`ColumnWriterImpl`] for the given [`ColumnWriter`].
1203    fn get_column_writer_ref<'a, 'b: 'a>(
1204        column_writer: &'b ColumnWriter<'a>,
1205    ) -> Option<&'b ColumnWriterImpl<'a, Self>>
1206    where
1207        Self: Sized;
1208
1209    /// Returns a mutable reference to the underlying [`ColumnWriterImpl`] for the given
1210    fn get_column_writer_mut<'a, 'b: 'a>(
1211        column_writer: &'a mut ColumnWriter<'b>,
1212    ) -> Option<&'a mut ColumnWriterImpl<'b, Self>>
1213    where
1214        Self: Sized;
1215}
1216
1217// Workaround bug in specialization
1218#[deprecated(
1219    since = "54.0.0",
1220    note = "Seems like a stray and nobody knows what's it for. Will be removed in 55.0.0"
1221)]
1222#[allow(missing_docs)]
1223pub trait SliceAsBytesDataType: DataType
1224where
1225    Self::T: SliceAsBytes,
1226{
1227}
1228
1229#[allow(deprecated)]
1230impl<T> SliceAsBytesDataType for T
1231where
1232    T: DataType,
1233    <T as DataType>::T: SliceAsBytes,
1234{
1235}
1236
1237macro_rules! make_type {
1238    ($name:ident, $reader_ident: ident, $writer_ident: ident, $native_ty:ty, $size:expr) => {
1239        #[doc = concat!("Parquet physical type: ", stringify!($name))]
1240        #[derive(Clone)]
1241        pub struct $name {}
1242
1243        impl DataType for $name {
1244            type T = $native_ty;
1245
1246            fn get_type_size() -> usize {
1247                $size
1248            }
1249
1250            fn get_column_reader(column_reader: ColumnReader) -> Option<ColumnReaderImpl<Self>> {
1251                match column_reader {
1252                    ColumnReader::$reader_ident(w) => Some(w),
1253                    _ => None,
1254                }
1255            }
1256
1257            fn get_column_writer(
1258                column_writer: ColumnWriter<'_>,
1259            ) -> Option<ColumnWriterImpl<'_, Self>> {
1260                match column_writer {
1261                    ColumnWriter::$writer_ident(w) => Some(w),
1262                    _ => None,
1263                }
1264            }
1265
1266            fn get_column_writer_ref<'a, 'b: 'a>(
1267                column_writer: &'a ColumnWriter<'b>,
1268            ) -> Option<&'a ColumnWriterImpl<'b, Self>> {
1269                match column_writer {
1270                    ColumnWriter::$writer_ident(w) => Some(w),
1271                    _ => None,
1272                }
1273            }
1274
1275            fn get_column_writer_mut<'a, 'b: 'a>(
1276                column_writer: &'a mut ColumnWriter<'b>,
1277            ) -> Option<&'a mut ColumnWriterImpl<'b, Self>> {
1278                match column_writer {
1279                    ColumnWriter::$writer_ident(w) => Some(w),
1280                    _ => None,
1281                }
1282            }
1283        }
1284    };
1285}
1286
1287// Generate struct definitions for all physical types
1288
1289make_type!(BoolType, BoolColumnReader, BoolColumnWriter, bool, 1);
1290make_type!(Int32Type, Int32ColumnReader, Int32ColumnWriter, i32, 4);
1291make_type!(Int64Type, Int64ColumnReader, Int64ColumnWriter, i64, 8);
1292make_type!(
1293    Int96Type,
1294    Int96ColumnReader,
1295    Int96ColumnWriter,
1296    Int96,
1297    mem::size_of::<Int96>()
1298);
1299make_type!(FloatType, FloatColumnReader, FloatColumnWriter, f32, 4);
1300make_type!(DoubleType, DoubleColumnReader, DoubleColumnWriter, f64, 8);
1301make_type!(
1302    ByteArrayType,
1303    ByteArrayColumnReader,
1304    ByteArrayColumnWriter,
1305    ByteArray,
1306    mem::size_of::<ByteArray>()
1307);
1308make_type!(
1309    FixedLenByteArrayType,
1310    FixedLenByteArrayColumnReader,
1311    FixedLenByteArrayColumnWriter,
1312    FixedLenByteArray,
1313    mem::size_of::<FixedLenByteArray>()
1314);
1315
1316impl AsRef<[u8]> for ByteArray {
1317    fn as_ref(&self) -> &[u8] {
1318        self.as_bytes()
1319    }
1320}
1321
1322impl AsRef<[u8]> for FixedLenByteArray {
1323    fn as_ref(&self) -> &[u8] {
1324        self.as_bytes()
1325    }
1326}
1327
1328/// Macro to reduce repetition in making type assertions on the physical type against `T`
1329macro_rules! ensure_phys_ty {
1330    ($($ty:pat_param)|+ , $err: literal) => {
1331        match T::get_physical_type() {
1332            $($ty => (),)*
1333            _ => panic!($err),
1334        };
1335    }
1336}
1337
1338#[cfg(test)]
1339mod tests {
1340    use super::*;
1341
1342    #[test]
1343    fn test_as_bytes() {
1344        // Test Int96
1345        let i96 = Int96::from(vec![1, 2, 3]);
1346        assert_eq!(i96.as_bytes(), &[1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0]);
1347
1348        // Test ByteArray
1349        let ba = ByteArray::from(vec![1, 2, 3]);
1350        assert_eq!(ba.as_bytes(), &[1, 2, 3]);
1351
1352        // Test Decimal
1353        let decimal = Decimal::from_i32(123, 5, 2);
1354        assert_eq!(decimal.as_bytes(), &[0, 0, 0, 123]);
1355        let decimal = Decimal::from_i64(123, 5, 2);
1356        assert_eq!(decimal.as_bytes(), &[0, 0, 0, 0, 0, 0, 0, 123]);
1357        let decimal = Decimal::from_bytes(ByteArray::from(vec![1, 2, 3]), 5, 2);
1358        assert_eq!(decimal.as_bytes(), &[1, 2, 3]);
1359    }
1360
1361    #[test]
1362    fn test_int96_from() {
1363        assert_eq!(
1364            Int96::from(vec![1, 12345, 1234567890]).data(),
1365            &[1, 12345, 1234567890]
1366        );
1367    }
1368
1369    #[test]
1370    fn test_byte_array_from() {
1371        assert_eq!(ByteArray::from(b"ABC".to_vec()).data(), b"ABC");
1372        assert_eq!(ByteArray::from("ABC").data(), b"ABC");
1373        assert_eq!(
1374            ByteArray::from(Bytes::from(vec![1u8, 2u8, 3u8, 4u8, 5u8])).data(),
1375            &[1u8, 2u8, 3u8, 4u8, 5u8]
1376        );
1377        let buf = vec![6u8, 7u8, 8u8, 9u8, 10u8];
1378        assert_eq!(ByteArray::from(buf).data(), &[6u8, 7u8, 8u8, 9u8, 10u8]);
1379    }
1380
1381    #[test]
1382    fn test_decimal_partial_eq() {
1383        assert_eq!(Decimal::default(), Decimal::from_i32(0, 0, 0));
1384        assert_eq!(Decimal::from_i32(222, 5, 2), Decimal::from_i32(222, 5, 2));
1385        assert_eq!(
1386            Decimal::from_bytes(ByteArray::from(vec![0, 0, 0, 3]), 5, 2),
1387            Decimal::from_i32(3, 5, 2)
1388        );
1389
1390        assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(111, 5, 2));
1391        assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 6, 2));
1392        assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 5, 3));
1393
1394        assert!(Decimal::from_i64(222, 5, 2) != Decimal::from_i32(222, 5, 2));
1395    }
1396
1397    #[test]
1398    fn test_byte_array_ord() {
1399        let ba1 = ByteArray::from(vec![1, 2, 3]);
1400        let ba11 = ByteArray::from(vec![1, 2, 3]);
1401        let ba2 = ByteArray::from(vec![3, 4]);
1402        let ba3 = ByteArray::from(vec![1, 2, 4]);
1403        let ba4 = ByteArray::from(vec![]);
1404        let ba5 = ByteArray::from(vec![2, 2, 3]);
1405
1406        assert!(ba1 < ba2);
1407        assert!(ba3 > ba1);
1408        assert!(ba1 > ba4);
1409        assert_eq!(ba1, ba11);
1410        assert!(ba5 > ba1);
1411    }
1412}