1use bytes::Bytes;
21use half::f16;
22use std::cmp::Ordering;
23use std::fmt;
24use std::mem;
25use std::ops::{Deref, DerefMut};
26use std::str::from_utf8;
27
28use crate::basic::Type;
29use crate::column::reader::{ColumnReader, ColumnReaderImpl};
30use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
31use crate::errors::{ParquetError, Result};
32use crate::util::bit_util::FromBytes;
33
34#[derive(Clone, Copy, Debug, PartialOrd, Default, PartialEq, Eq)]
37pub struct Int96 {
38 value: [u32; 3],
39}
40
41const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
42
43const SECONDS_IN_DAY: i64 = 86_400;
45const MILLISECONDS: i64 = 1_000;
47const MICROSECONDS: i64 = 1_000_000;
49const NANOSECONDS: i64 = 1_000_000_000;
51
52const MILLISECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MILLISECONDS;
54const MICROSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MICROSECONDS;
56const NANOSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * NANOSECONDS;
58
59impl Int96 {
60 pub fn new() -> Self {
62 Self { value: [0; 3] }
63 }
64
65 #[inline]
67 pub fn data(&self) -> &[u32] {
68 &self.value
69 }
70
71 #[inline]
73 pub fn set_data(&mut self, elem0: u32, elem1: u32, elem2: u32) {
74 self.value = [elem0, elem1, elem2];
75 }
76
77 #[deprecated(since = "54.0.0", note = "Use `to_millis` instead")]
79 pub fn to_i64(&self) -> i64 {
80 self.to_millis()
81 }
82
83 #[inline]
87 pub fn to_seconds(&self) -> i64 {
88 let (day, nanos) = self.data_as_days_and_nanos();
89 (day as i64 - JULIAN_DAY_OF_EPOCH)
90 .wrapping_mul(SECONDS_IN_DAY)
91 .wrapping_add(nanos / 1_000_000_000)
92 }
93
94 #[inline]
98 pub fn to_millis(&self) -> i64 {
99 let (day, nanos) = self.data_as_days_and_nanos();
100 (day as i64 - JULIAN_DAY_OF_EPOCH)
101 .wrapping_mul(MILLISECONDS_IN_DAY)
102 .wrapping_add(nanos / 1_000_000)
103 }
104
105 #[inline]
109 pub fn to_micros(&self) -> i64 {
110 let (day, nanos) = self.data_as_days_and_nanos();
111 (day as i64 - JULIAN_DAY_OF_EPOCH)
112 .wrapping_mul(MICROSECONDS_IN_DAY)
113 .wrapping_add(nanos / 1_000)
114 }
115
116 #[inline]
120 pub fn to_nanos(&self) -> i64 {
121 let (day, nanos) = self.data_as_days_and_nanos();
122 (day as i64 - JULIAN_DAY_OF_EPOCH)
123 .wrapping_mul(NANOSECONDS_IN_DAY)
124 .wrapping_add(nanos)
125 }
126
127 #[inline]
128 fn data_as_days_and_nanos(&self) -> (i32, i64) {
129 let day = self.data()[2] as i32;
130 let nanos = ((self.data()[1] as i64) << 32) + self.data()[0] as i64;
131 (day, nanos)
132 }
133}
134
135impl From<Vec<u32>> for Int96 {
136 fn from(buf: Vec<u32>) -> Self {
137 assert_eq!(buf.len(), 3);
138 let mut result = Self::new();
139 result.set_data(buf[0], buf[1], buf[2]);
140 result
141 }
142}
143
144impl fmt::Display for Int96 {
145 #[cold]
146 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
147 write!(f, "{:?}", self.data())
148 }
149}
150
151#[derive(Clone, Default)]
154pub struct ByteArray {
155 data: Option<Bytes>,
156}
157
158impl std::fmt::Debug for ByteArray {
160 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161 let mut debug_struct = f.debug_struct("ByteArray");
162 match self.as_utf8() {
163 Ok(s) => debug_struct.field("data", &s),
164 Err(_) => debug_struct.field("data", &self.data),
165 };
166 debug_struct.finish()
167 }
168}
169
170impl PartialOrd for ByteArray {
171 fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
172 match (&self.data, &other.data) {
177 (None, None) => Some(Ordering::Equal),
178 (None, Some(_)) => Some(Ordering::Less),
179 (Some(_), None) => Some(Ordering::Greater),
180 (Some(self_data), Some(other_data)) => {
181 self_data.partial_cmp(&other_data)
183 }
184 }
185 }
186}
187
188impl ByteArray {
189 #[inline]
191 pub fn new() -> Self {
192 ByteArray { data: None }
193 }
194
195 #[inline]
197 pub fn len(&self) -> usize {
198 assert!(self.data.is_some());
199 self.data.as_ref().unwrap().len()
200 }
201
202 #[inline]
204 pub fn is_empty(&self) -> bool {
205 self.len() == 0
206 }
207
208 #[inline]
210 pub fn data(&self) -> &[u8] {
211 self.data
212 .as_ref()
213 .expect("set_data should have been called")
214 .as_ref()
215 }
216
217 #[inline]
219 pub fn set_data(&mut self, data: Bytes) {
220 self.data = Some(data);
221 }
222
223 #[inline]
225 pub fn slice(&self, start: usize, len: usize) -> Self {
226 Self::from(
227 self.data
228 .as_ref()
229 .expect("set_data should have been called")
230 .slice(start..start + len),
231 )
232 }
233
234 pub fn as_utf8(&self) -> Result<&str> {
236 self.data
237 .as_ref()
238 .map(|ptr| ptr.as_ref())
239 .ok_or_else(|| general_err!("Can't convert empty byte array to utf8"))
240 .and_then(|bytes| from_utf8(bytes).map_err(|e| e.into()))
241 }
242}
243
244impl From<Vec<u8>> for ByteArray {
245 fn from(buf: Vec<u8>) -> ByteArray {
246 Self {
247 data: Some(buf.into()),
248 }
249 }
250}
251
252impl<'a> From<&'a [u8]> for ByteArray {
253 fn from(b: &'a [u8]) -> ByteArray {
254 let mut v = Vec::new();
255 v.extend_from_slice(b);
256 Self {
257 data: Some(v.into()),
258 }
259 }
260}
261
262impl<'a> From<&'a str> for ByteArray {
263 fn from(s: &'a str) -> ByteArray {
264 let mut v = Vec::new();
265 v.extend_from_slice(s.as_bytes());
266 Self {
267 data: Some(v.into()),
268 }
269 }
270}
271
272impl From<Bytes> for ByteArray {
273 fn from(value: Bytes) -> Self {
274 Self { data: Some(value) }
275 }
276}
277
278impl From<f16> for ByteArray {
279 fn from(value: f16) -> Self {
280 Self::from(value.to_le_bytes().as_slice())
281 }
282}
283
284impl PartialEq for ByteArray {
285 fn eq(&self, other: &ByteArray) -> bool {
286 match (&self.data, &other.data) {
287 (Some(d1), Some(d2)) => d1.as_ref() == d2.as_ref(),
288 (None, None) => true,
289 _ => false,
290 }
291 }
292}
293
294impl fmt::Display for ByteArray {
295 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
296 write!(f, "{:?}", self.data())
297 }
298}
299
300#[repr(transparent)]
315#[derive(Clone, Debug, Default)]
316pub struct FixedLenByteArray(ByteArray);
317
318impl PartialEq for FixedLenByteArray {
319 fn eq(&self, other: &FixedLenByteArray) -> bool {
320 self.0.eq(&other.0)
321 }
322}
323
324impl PartialEq<ByteArray> for FixedLenByteArray {
325 fn eq(&self, other: &ByteArray) -> bool {
326 self.0.eq(other)
327 }
328}
329
330impl PartialEq<FixedLenByteArray> for ByteArray {
331 fn eq(&self, other: &FixedLenByteArray) -> bool {
332 self.eq(&other.0)
333 }
334}
335
336impl fmt::Display for FixedLenByteArray {
337 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
338 self.0.fmt(f)
339 }
340}
341
342impl PartialOrd for FixedLenByteArray {
343 fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> {
344 self.0.partial_cmp(&other.0)
345 }
346}
347
348impl PartialOrd<FixedLenByteArray> for ByteArray {
349 fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> {
350 self.partial_cmp(&other.0)
351 }
352}
353
354impl PartialOrd<ByteArray> for FixedLenByteArray {
355 fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
356 self.0.partial_cmp(other)
357 }
358}
359
360impl Deref for FixedLenByteArray {
361 type Target = ByteArray;
362
363 fn deref(&self) -> &Self::Target {
364 &self.0
365 }
366}
367
368impl DerefMut for FixedLenByteArray {
369 fn deref_mut(&mut self) -> &mut Self::Target {
370 &mut self.0
371 }
372}
373
374impl From<ByteArray> for FixedLenByteArray {
375 fn from(other: ByteArray) -> Self {
376 Self(other)
377 }
378}
379
380impl From<Vec<u8>> for FixedLenByteArray {
381 fn from(buf: Vec<u8>) -> FixedLenByteArray {
382 FixedLenByteArray(ByteArray::from(buf))
383 }
384}
385
386impl From<FixedLenByteArray> for ByteArray {
387 fn from(other: FixedLenByteArray) -> Self {
388 other.0
389 }
390}
391
392#[derive(Clone, Debug)]
398pub enum Decimal {
399 Int32 {
401 value: [u8; 4],
403 precision: i32,
405 scale: i32,
407 },
408 Int64 {
410 value: [u8; 8],
412 precision: i32,
414 scale: i32,
416 },
417 Bytes {
419 value: ByteArray,
421 precision: i32,
423 scale: i32,
425 },
426}
427
428impl Decimal {
429 pub fn from_i32(value: i32, precision: i32, scale: i32) -> Self {
431 let bytes = value.to_be_bytes();
432 Decimal::Int32 {
433 value: bytes,
434 precision,
435 scale,
436 }
437 }
438
439 pub fn from_i64(value: i64, precision: i32, scale: i32) -> Self {
441 let bytes = value.to_be_bytes();
442 Decimal::Int64 {
443 value: bytes,
444 precision,
445 scale,
446 }
447 }
448
449 pub fn from_bytes(value: ByteArray, precision: i32, scale: i32) -> Self {
451 Decimal::Bytes {
452 value,
453 precision,
454 scale,
455 }
456 }
457
458 pub fn data(&self) -> &[u8] {
460 match *self {
461 Decimal::Int32 { ref value, .. } => value,
462 Decimal::Int64 { ref value, .. } => value,
463 Decimal::Bytes { ref value, .. } => value.data(),
464 }
465 }
466
467 pub fn precision(&self) -> i32 {
469 match *self {
470 Decimal::Int32 { precision, .. } => precision,
471 Decimal::Int64 { precision, .. } => precision,
472 Decimal::Bytes { precision, .. } => precision,
473 }
474 }
475
476 pub fn scale(&self) -> i32 {
478 match *self {
479 Decimal::Int32 { scale, .. } => scale,
480 Decimal::Int64 { scale, .. } => scale,
481 Decimal::Bytes { scale, .. } => scale,
482 }
483 }
484}
485
486impl Default for Decimal {
487 fn default() -> Self {
488 Self::from_i32(0, 0, 0)
489 }
490}
491
492impl PartialEq for Decimal {
493 fn eq(&self, other: &Decimal) -> bool {
494 self.precision() == other.precision()
495 && self.scale() == other.scale()
496 && self.data() == other.data()
497 }
498}
499
500pub trait AsBytes {
502 fn as_bytes(&self) -> &[u8];
504}
505
506pub trait SliceAsBytes: Sized {
508 fn slice_as_bytes(self_: &[Self]) -> &[u8];
510 unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8];
516}
517
518impl AsBytes for [u8] {
519 fn as_bytes(&self) -> &[u8] {
520 self
521 }
522}
523
524macro_rules! gen_as_bytes {
525 ($source_ty:ident) => {
526 impl AsBytes for $source_ty {
527 #[allow(clippy::size_of_in_element_count)]
528 fn as_bytes(&self) -> &[u8] {
529 unsafe {
532 std::slice::from_raw_parts(
533 self as *const $source_ty as *const u8,
534 std::mem::size_of::<$source_ty>(),
535 )
536 }
537 }
538 }
539
540 impl SliceAsBytes for $source_ty {
541 #[inline]
542 #[allow(clippy::size_of_in_element_count)]
543 fn slice_as_bytes(self_: &[Self]) -> &[u8] {
544 unsafe {
547 std::slice::from_raw_parts(
548 self_.as_ptr() as *const u8,
549 std::mem::size_of_val(self_),
550 )
551 }
552 }
553
554 #[inline]
555 #[allow(clippy::size_of_in_element_count)]
556 unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8] {
557 unsafe {
561 std::slice::from_raw_parts_mut(
562 self_.as_mut_ptr() as *mut u8,
563 std::mem::size_of_val(self_),
564 )
565 }
566 }
567 }
568 };
569}
570
571gen_as_bytes!(i8);
572gen_as_bytes!(i16);
573gen_as_bytes!(i32);
574gen_as_bytes!(i64);
575gen_as_bytes!(u8);
576gen_as_bytes!(u16);
577gen_as_bytes!(u32);
578gen_as_bytes!(u64);
579gen_as_bytes!(f32);
580gen_as_bytes!(f64);
581
582macro_rules! unimplemented_slice_as_bytes {
583 ($ty: ty) => {
584 impl SliceAsBytes for $ty {
585 fn slice_as_bytes(_self: &[Self]) -> &[u8] {
586 unimplemented!()
587 }
588
589 unsafe fn slice_as_bytes_mut(_self: &mut [Self]) -> &mut [u8] {
590 unimplemented!()
591 }
592 }
593 };
594}
595
596unimplemented_slice_as_bytes!(Int96);
598unimplemented_slice_as_bytes!(bool);
599unimplemented_slice_as_bytes!(ByteArray);
600unimplemented_slice_as_bytes!(FixedLenByteArray);
601
602impl AsBytes for bool {
603 fn as_bytes(&self) -> &[u8] {
604 unsafe { std::slice::from_raw_parts(self as *const bool as *const u8, 1) }
607 }
608}
609
610impl AsBytes for Int96 {
611 fn as_bytes(&self) -> &[u8] {
612 unsafe { std::slice::from_raw_parts(self.data() as *const [u32] as *const u8, 12) }
614 }
615}
616
617impl AsBytes for ByteArray {
618 fn as_bytes(&self) -> &[u8] {
619 self.data()
620 }
621}
622
623impl AsBytes for FixedLenByteArray {
624 fn as_bytes(&self) -> &[u8] {
625 self.data()
626 }
627}
628
629impl AsBytes for Decimal {
630 fn as_bytes(&self) -> &[u8] {
631 self.data()
632 }
633}
634
635impl AsBytes for Vec<u8> {
636 fn as_bytes(&self) -> &[u8] {
637 self.as_slice()
638 }
639}
640
641impl AsBytes for &str {
642 fn as_bytes(&self) -> &[u8] {
643 (self as &str).as_bytes()
644 }
645}
646
647impl AsBytes for str {
648 fn as_bytes(&self) -> &[u8] {
649 (self as &str).as_bytes()
650 }
651}
652
653pub(crate) mod private {
654 use bytes::Bytes;
655
656 use crate::encodings::decoding::PlainDecoderDetails;
657 use crate::util::bit_util::{read_num_bytes, BitReader, BitWriter};
658
659 use super::{ParquetError, Result, SliceAsBytes};
660 use crate::basic::Type;
661 use crate::file::metadata::HeapSize;
662
663 pub trait ParquetValueType:
669 PartialEq
670 + std::fmt::Debug
671 + std::fmt::Display
672 + Default
673 + Clone
674 + super::AsBytes
675 + super::FromBytes
676 + SliceAsBytes
677 + PartialOrd
678 + Send
679 + HeapSize
680 + crate::encodings::decoding::private::GetDecoder
681 + crate::file::statistics::private::MakeStatistics
682 {
683 const PHYSICAL_TYPE: Type;
684
685 fn encode<W: std::io::Write>(
687 values: &[Self],
688 writer: &mut W,
689 bit_writer: &mut BitWriter,
690 ) -> Result<()>;
691
692 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize);
694
695 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize>;
697
698 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize>;
699
700 fn dict_encoding_size(&self) -> (usize, usize) {
702 (std::mem::size_of::<Self>(), 1)
703 }
704
705 fn variable_length_bytes(_: &[Self]) -> Option<i64> {
709 None
710 }
711
712 fn as_i64(&self) -> Result<i64> {
717 Err(general_err!("Type cannot be converted to i64"))
718 }
719
720 fn as_u64(&self) -> Result<u64> {
725 self.as_i64()
726 .map_err(|_| general_err!("Type cannot be converted to u64"))
727 .map(|x| x as u64)
728 }
729
730 fn as_any(&self) -> &dyn std::any::Any;
732
733 fn as_mut_any(&mut self) -> &mut dyn std::any::Any;
735
736 fn set_from_bytes(&mut self, _data: Bytes) {
740 unimplemented!();
741 }
742 }
743
744 impl ParquetValueType for bool {
745 const PHYSICAL_TYPE: Type = Type::BOOLEAN;
746
747 #[inline]
748 fn encode<W: std::io::Write>(
749 values: &[Self],
750 _: &mut W,
751 bit_writer: &mut BitWriter,
752 ) -> Result<()> {
753 for value in values {
754 bit_writer.put_value(*value as u64, 1)
755 }
756 Ok(())
757 }
758
759 #[inline]
760 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
761 decoder.bit_reader.replace(BitReader::new(data));
762 decoder.num_values = num_values;
763 }
764
765 #[inline]
766 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
767 let bit_reader = decoder.bit_reader.as_mut().unwrap();
768 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
769 let values_read = bit_reader.get_batch(&mut buffer[..num_values], 1);
770 decoder.num_values -= values_read;
771 Ok(values_read)
772 }
773
774 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
775 let bit_reader = decoder.bit_reader.as_mut().unwrap();
776 let num_values = std::cmp::min(num_values, decoder.num_values);
777 let values_read = bit_reader.skip(num_values, 1);
778 decoder.num_values -= values_read;
779 Ok(values_read)
780 }
781
782 #[inline]
783 fn as_i64(&self) -> Result<i64> {
784 Ok(*self as i64)
785 }
786
787 #[inline]
788 fn as_any(&self) -> &dyn std::any::Any {
789 self
790 }
791
792 #[inline]
793 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
794 self
795 }
796 }
797
798 macro_rules! impl_from_raw {
799 ($ty: ty, $physical_ty: expr, $self: ident => $as_i64: block) => {
800 impl ParquetValueType for $ty {
801 const PHYSICAL_TYPE: Type = $physical_ty;
802
803 #[inline]
804 fn encode<W: std::io::Write>(values: &[Self], writer: &mut W, _: &mut BitWriter) -> Result<()> {
805 let raw = unsafe {
807 std::slice::from_raw_parts(
808 values.as_ptr() as *const u8,
809 std::mem::size_of_val(values),
810 )
811 };
812 writer.write_all(raw)?;
813
814 Ok(())
815 }
816
817 #[inline]
818 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
819 decoder.data.replace(data);
820 decoder.start = 0;
821 decoder.num_values = num_values;
822 }
823
824 #[inline]
825 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
826 let data = decoder.data.as_ref().expect("set_data should have been called");
827 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
828 let bytes_left = data.len() - decoder.start;
829 let bytes_to_decode = std::mem::size_of::<Self>() * num_values;
830
831 if bytes_left < bytes_to_decode {
832 return Err(eof_err!("Not enough bytes to decode"));
833 }
834
835 {
836 let raw_buffer = &mut unsafe { Self::slice_as_bytes_mut(buffer) }[..bytes_to_decode];
839 raw_buffer.copy_from_slice(data.slice(
840 decoder.start..decoder.start + bytes_to_decode
841 ).as_ref());
842 };
843 decoder.start += bytes_to_decode;
844 decoder.num_values -= num_values;
845
846 Ok(num_values)
847 }
848
849 #[inline]
850 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
851 let data = decoder.data.as_ref().expect("set_data should have been called");
852 let num_values = num_values.min(decoder.num_values);
853 let bytes_left = data.len() - decoder.start;
854 let bytes_to_skip = std::mem::size_of::<Self>() * num_values;
855
856 if bytes_left < bytes_to_skip {
857 return Err(eof_err!("Not enough bytes to skip"));
858 }
859
860 decoder.start += bytes_to_skip;
861 decoder.num_values -= num_values;
862
863 Ok(num_values)
864 }
865
866 #[inline]
867 fn as_i64(&$self) -> Result<i64> {
868 $as_i64
869 }
870
871 #[inline]
872 fn as_any(&self) -> &dyn std::any::Any {
873 self
874 }
875
876 #[inline]
877 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
878 self
879 }
880 }
881 }
882 }
883
884 impl_from_raw!(i32, Type::INT32, self => { Ok(*self as i64) });
885 impl_from_raw!(i64, Type::INT64, self => { Ok(*self) });
886 impl_from_raw!(f32, Type::FLOAT, self => { Err(general_err!("Type cannot be converted to i64")) });
887 impl_from_raw!(f64, Type::DOUBLE, self => { Err(general_err!("Type cannot be converted to i64")) });
888
889 impl ParquetValueType for super::Int96 {
890 const PHYSICAL_TYPE: Type = Type::INT96;
891
892 #[inline]
893 fn encode<W: std::io::Write>(
894 values: &[Self],
895 writer: &mut W,
896 _: &mut BitWriter,
897 ) -> Result<()> {
898 for value in values {
899 let raw = SliceAsBytes::slice_as_bytes(value.data());
900 writer.write_all(raw)?;
901 }
902 Ok(())
903 }
904
905 #[inline]
906 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
907 decoder.data.replace(data);
908 decoder.start = 0;
909 decoder.num_values = num_values;
910 }
911
912 #[inline]
913 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
914 let data = decoder
916 .data
917 .as_ref()
918 .expect("set_data should have been called");
919 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
920 let bytes_left = data.len() - decoder.start;
921 let bytes_to_decode = 12 * num_values;
922
923 if bytes_left < bytes_to_decode {
924 return Err(eof_err!("Not enough bytes to decode"));
925 }
926
927 let data_range = data.slice(decoder.start..decoder.start + bytes_to_decode);
928 let bytes: &[u8] = &data_range;
929 decoder.start += bytes_to_decode;
930
931 let mut pos = 0; for item in buffer.iter_mut().take(num_values) {
933 let elem0 = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap());
934 let elem1 = u32::from_le_bytes(bytes[pos + 4..pos + 8].try_into().unwrap());
935 let elem2 = u32::from_le_bytes(bytes[pos + 8..pos + 12].try_into().unwrap());
936
937 item.set_data(elem0, elem1, elem2);
938 pos += 12;
939 }
940 decoder.num_values -= num_values;
941
942 Ok(num_values)
943 }
944
945 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
946 let data = decoder
947 .data
948 .as_ref()
949 .expect("set_data should have been called");
950 let num_values = std::cmp::min(num_values, decoder.num_values);
951 let bytes_left = data.len() - decoder.start;
952 let bytes_to_skip = 12 * num_values;
953
954 if bytes_left < bytes_to_skip {
955 return Err(eof_err!("Not enough bytes to skip"));
956 }
957 decoder.start += bytes_to_skip;
958 decoder.num_values -= num_values;
959
960 Ok(num_values)
961 }
962
963 #[inline]
964 fn as_any(&self) -> &dyn std::any::Any {
965 self
966 }
967
968 #[inline]
969 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
970 self
971 }
972 }
973
974 impl HeapSize for super::Int96 {
975 fn heap_size(&self) -> usize {
976 0 }
978 }
979
980 impl ParquetValueType for super::ByteArray {
981 const PHYSICAL_TYPE: Type = Type::BYTE_ARRAY;
982
983 #[inline]
984 fn encode<W: std::io::Write>(
985 values: &[Self],
986 writer: &mut W,
987 _: &mut BitWriter,
988 ) -> Result<()> {
989 for value in values {
990 let len: u32 = value.len().try_into().unwrap();
991 writer.write_all(&len.to_ne_bytes())?;
992 let raw = value.data();
993 writer.write_all(raw)?;
994 }
995 Ok(())
996 }
997
998 #[inline]
999 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
1000 decoder.data.replace(data);
1001 decoder.start = 0;
1002 decoder.num_values = num_values;
1003 }
1004
1005 #[inline]
1006 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
1007 let data = decoder
1008 .data
1009 .as_mut()
1010 .expect("set_data should have been called");
1011 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
1012 for val_array in buffer.iter_mut().take(num_values) {
1013 let len: usize =
1014 read_num_bytes::<u32>(4, data.slice(decoder.start..).as_ref()) as usize;
1015 decoder.start += std::mem::size_of::<u32>();
1016
1017 if data.len() < decoder.start + len {
1018 return Err(eof_err!("Not enough bytes to decode"));
1019 }
1020
1021 val_array.set_data(data.slice(decoder.start..decoder.start + len));
1022 decoder.start += len;
1023 }
1024 decoder.num_values -= num_values;
1025
1026 Ok(num_values)
1027 }
1028
1029 fn variable_length_bytes(values: &[Self]) -> Option<i64> {
1030 Some(values.iter().map(|x| x.len() as i64).sum())
1031 }
1032
1033 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
1034 let data = decoder
1035 .data
1036 .as_mut()
1037 .expect("set_data should have been called");
1038 let num_values = num_values.min(decoder.num_values);
1039
1040 for _ in 0..num_values {
1041 let len: usize =
1042 read_num_bytes::<u32>(4, data.slice(decoder.start..).as_ref()) as usize;
1043 decoder.start += std::mem::size_of::<u32>() + len;
1044 }
1045 decoder.num_values -= num_values;
1046
1047 Ok(num_values)
1048 }
1049
1050 #[inline]
1051 fn dict_encoding_size(&self) -> (usize, usize) {
1052 (std::mem::size_of::<u32>(), self.len())
1053 }
1054
1055 #[inline]
1056 fn as_any(&self) -> &dyn std::any::Any {
1057 self
1058 }
1059
1060 #[inline]
1061 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
1062 self
1063 }
1064
1065 #[inline]
1066 fn set_from_bytes(&mut self, data: Bytes) {
1067 self.set_data(data);
1068 }
1069 }
1070
1071 impl HeapSize for super::ByteArray {
1072 fn heap_size(&self) -> usize {
1073 self.data.as_ref().map(|data| data.len()).unwrap_or(0)
1077 }
1078 }
1079
1080 impl ParquetValueType for super::FixedLenByteArray {
1081 const PHYSICAL_TYPE: Type = Type::FIXED_LEN_BYTE_ARRAY;
1082
1083 #[inline]
1084 fn encode<W: std::io::Write>(
1085 values: &[Self],
1086 writer: &mut W,
1087 _: &mut BitWriter,
1088 ) -> Result<()> {
1089 for value in values {
1090 let raw = value.data();
1091 writer.write_all(raw)?;
1092 }
1093 Ok(())
1094 }
1095
1096 #[inline]
1097 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
1098 decoder.data.replace(data);
1099 decoder.start = 0;
1100 decoder.num_values = num_values;
1101 }
1102
1103 #[inline]
1104 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
1105 assert!(decoder.type_length > 0);
1106
1107 let data = decoder
1108 .data
1109 .as_mut()
1110 .expect("set_data should have been called");
1111 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
1112
1113 for item in buffer.iter_mut().take(num_values) {
1114 let len = decoder.type_length as usize;
1115
1116 if data.len() < decoder.start + len {
1117 return Err(eof_err!("Not enough bytes to decode"));
1118 }
1119
1120 item.set_data(data.slice(decoder.start..decoder.start + len));
1121 decoder.start += len;
1122 }
1123 decoder.num_values -= num_values;
1124
1125 Ok(num_values)
1126 }
1127
1128 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
1129 assert!(decoder.type_length > 0);
1130
1131 let data = decoder
1132 .data
1133 .as_mut()
1134 .expect("set_data should have been called");
1135 let num_values = std::cmp::min(num_values, decoder.num_values);
1136 for _ in 0..num_values {
1137 let len = decoder.type_length as usize;
1138
1139 if data.len() < decoder.start + len {
1140 return Err(eof_err!("Not enough bytes to skip"));
1141 }
1142
1143 decoder.start += len;
1144 }
1145 decoder.num_values -= num_values;
1146
1147 Ok(num_values)
1148 }
1149
1150 #[inline]
1151 fn dict_encoding_size(&self) -> (usize, usize) {
1152 (std::mem::size_of::<u32>(), self.len())
1153 }
1154
1155 #[inline]
1156 fn as_any(&self) -> &dyn std::any::Any {
1157 self
1158 }
1159
1160 #[inline]
1161 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
1162 self
1163 }
1164
1165 #[inline]
1166 fn set_from_bytes(&mut self, data: Bytes) {
1167 self.set_data(data);
1168 }
1169 }
1170
1171 impl HeapSize for super::FixedLenByteArray {
1172 fn heap_size(&self) -> usize {
1173 self.0.heap_size()
1174 }
1175 }
1176}
1177
1178pub trait DataType: 'static + Send {
1181 type T: private::ParquetValueType;
1183
1184 fn get_physical_type() -> Type {
1186 <Self::T as private::ParquetValueType>::PHYSICAL_TYPE
1187 }
1188
1189 fn get_type_size() -> usize;
1191
1192 fn get_column_reader(column_writer: ColumnReader) -> Option<ColumnReaderImpl<Self>>
1194 where
1195 Self: Sized;
1196
1197 fn get_column_writer(column_writer: ColumnWriter<'_>) -> Option<ColumnWriterImpl<'_, Self>>
1199 where
1200 Self: Sized;
1201
1202 fn get_column_writer_ref<'a, 'b: 'a>(
1204 column_writer: &'b ColumnWriter<'a>,
1205 ) -> Option<&'b ColumnWriterImpl<'a, Self>>
1206 where
1207 Self: Sized;
1208
1209 fn get_column_writer_mut<'a, 'b: 'a>(
1211 column_writer: &'a mut ColumnWriter<'b>,
1212 ) -> Option<&'a mut ColumnWriterImpl<'b, Self>>
1213 where
1214 Self: Sized;
1215}
1216
1217#[deprecated(
1219 since = "54.0.0",
1220 note = "Seems like a stray and nobody knows what's it for. Will be removed in 55.0.0"
1221)]
1222#[allow(missing_docs)]
1223pub trait SliceAsBytesDataType: DataType
1224where
1225 Self::T: SliceAsBytes,
1226{
1227}
1228
1229#[allow(deprecated)]
1230impl<T> SliceAsBytesDataType for T
1231where
1232 T: DataType,
1233 <T as DataType>::T: SliceAsBytes,
1234{
1235}
1236
1237macro_rules! make_type {
1238 ($name:ident, $reader_ident: ident, $writer_ident: ident, $native_ty:ty, $size:expr) => {
1239 #[doc = concat!("Parquet physical type: ", stringify!($name))]
1240 #[derive(Clone)]
1241 pub struct $name {}
1242
1243 impl DataType for $name {
1244 type T = $native_ty;
1245
1246 fn get_type_size() -> usize {
1247 $size
1248 }
1249
1250 fn get_column_reader(column_reader: ColumnReader) -> Option<ColumnReaderImpl<Self>> {
1251 match column_reader {
1252 ColumnReader::$reader_ident(w) => Some(w),
1253 _ => None,
1254 }
1255 }
1256
1257 fn get_column_writer(
1258 column_writer: ColumnWriter<'_>,
1259 ) -> Option<ColumnWriterImpl<'_, Self>> {
1260 match column_writer {
1261 ColumnWriter::$writer_ident(w) => Some(w),
1262 _ => None,
1263 }
1264 }
1265
1266 fn get_column_writer_ref<'a, 'b: 'a>(
1267 column_writer: &'a ColumnWriter<'b>,
1268 ) -> Option<&'a ColumnWriterImpl<'b, Self>> {
1269 match column_writer {
1270 ColumnWriter::$writer_ident(w) => Some(w),
1271 _ => None,
1272 }
1273 }
1274
1275 fn get_column_writer_mut<'a, 'b: 'a>(
1276 column_writer: &'a mut ColumnWriter<'b>,
1277 ) -> Option<&'a mut ColumnWriterImpl<'b, Self>> {
1278 match column_writer {
1279 ColumnWriter::$writer_ident(w) => Some(w),
1280 _ => None,
1281 }
1282 }
1283 }
1284 };
1285}
1286
1287make_type!(BoolType, BoolColumnReader, BoolColumnWriter, bool, 1);
1290make_type!(Int32Type, Int32ColumnReader, Int32ColumnWriter, i32, 4);
1291make_type!(Int64Type, Int64ColumnReader, Int64ColumnWriter, i64, 8);
1292make_type!(
1293 Int96Type,
1294 Int96ColumnReader,
1295 Int96ColumnWriter,
1296 Int96,
1297 mem::size_of::<Int96>()
1298);
1299make_type!(FloatType, FloatColumnReader, FloatColumnWriter, f32, 4);
1300make_type!(DoubleType, DoubleColumnReader, DoubleColumnWriter, f64, 8);
1301make_type!(
1302 ByteArrayType,
1303 ByteArrayColumnReader,
1304 ByteArrayColumnWriter,
1305 ByteArray,
1306 mem::size_of::<ByteArray>()
1307);
1308make_type!(
1309 FixedLenByteArrayType,
1310 FixedLenByteArrayColumnReader,
1311 FixedLenByteArrayColumnWriter,
1312 FixedLenByteArray,
1313 mem::size_of::<FixedLenByteArray>()
1314);
1315
1316impl AsRef<[u8]> for ByteArray {
1317 fn as_ref(&self) -> &[u8] {
1318 self.as_bytes()
1319 }
1320}
1321
1322impl AsRef<[u8]> for FixedLenByteArray {
1323 fn as_ref(&self) -> &[u8] {
1324 self.as_bytes()
1325 }
1326}
1327
1328macro_rules! ensure_phys_ty {
1330 ($($ty:pat_param)|+ , $err: literal) => {
1331 match T::get_physical_type() {
1332 $($ty => (),)*
1333 _ => panic!($err),
1334 };
1335 }
1336}
1337
1338#[cfg(test)]
1339mod tests {
1340 use super::*;
1341
1342 #[test]
1343 fn test_as_bytes() {
1344 let i96 = Int96::from(vec![1, 2, 3]);
1346 assert_eq!(i96.as_bytes(), &[1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0]);
1347
1348 let ba = ByteArray::from(vec![1, 2, 3]);
1350 assert_eq!(ba.as_bytes(), &[1, 2, 3]);
1351
1352 let decimal = Decimal::from_i32(123, 5, 2);
1354 assert_eq!(decimal.as_bytes(), &[0, 0, 0, 123]);
1355 let decimal = Decimal::from_i64(123, 5, 2);
1356 assert_eq!(decimal.as_bytes(), &[0, 0, 0, 0, 0, 0, 0, 123]);
1357 let decimal = Decimal::from_bytes(ByteArray::from(vec![1, 2, 3]), 5, 2);
1358 assert_eq!(decimal.as_bytes(), &[1, 2, 3]);
1359 }
1360
1361 #[test]
1362 fn test_int96_from() {
1363 assert_eq!(
1364 Int96::from(vec![1, 12345, 1234567890]).data(),
1365 &[1, 12345, 1234567890]
1366 );
1367 }
1368
1369 #[test]
1370 fn test_byte_array_from() {
1371 assert_eq!(ByteArray::from(b"ABC".to_vec()).data(), b"ABC");
1372 assert_eq!(ByteArray::from("ABC").data(), b"ABC");
1373 assert_eq!(
1374 ByteArray::from(Bytes::from(vec![1u8, 2u8, 3u8, 4u8, 5u8])).data(),
1375 &[1u8, 2u8, 3u8, 4u8, 5u8]
1376 );
1377 let buf = vec![6u8, 7u8, 8u8, 9u8, 10u8];
1378 assert_eq!(ByteArray::from(buf).data(), &[6u8, 7u8, 8u8, 9u8, 10u8]);
1379 }
1380
1381 #[test]
1382 fn test_decimal_partial_eq() {
1383 assert_eq!(Decimal::default(), Decimal::from_i32(0, 0, 0));
1384 assert_eq!(Decimal::from_i32(222, 5, 2), Decimal::from_i32(222, 5, 2));
1385 assert_eq!(
1386 Decimal::from_bytes(ByteArray::from(vec![0, 0, 0, 3]), 5, 2),
1387 Decimal::from_i32(3, 5, 2)
1388 );
1389
1390 assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(111, 5, 2));
1391 assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 6, 2));
1392 assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 5, 3));
1393
1394 assert!(Decimal::from_i64(222, 5, 2) != Decimal::from_i32(222, 5, 2));
1395 }
1396
1397 #[test]
1398 fn test_byte_array_ord() {
1399 let ba1 = ByteArray::from(vec![1, 2, 3]);
1400 let ba11 = ByteArray::from(vec![1, 2, 3]);
1401 let ba2 = ByteArray::from(vec![3, 4]);
1402 let ba3 = ByteArray::from(vec![1, 2, 4]);
1403 let ba4 = ByteArray::from(vec![]);
1404 let ba5 = ByteArray::from(vec![2, 2, 3]);
1405
1406 assert!(ba1 < ba2);
1407 assert!(ba3 > ba1);
1408 assert!(ba1 > ba4);
1409 assert_eq!(ba1, ba11);
1410 assert!(ba5 > ba1);
1411 }
1412}