1use bytes::Bytes;
21use half::f16;
22use std::cmp::Ordering;
23use std::fmt;
24use std::mem;
25use std::ops::{Deref, DerefMut};
26use std::str::from_utf8;
27
28use crate::basic::Type;
29use crate::column::reader::{ColumnReader, ColumnReaderImpl};
30use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
31use crate::errors::{ParquetError, Result};
32use crate::util::bit_util::FromBytes;
33
34#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
37pub struct Int96 {
38 value: [u32; 3],
39}
40
41const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
42
43const SECONDS_IN_DAY: i64 = 86_400;
45const MILLISECONDS: i64 = 1_000;
47const MICROSECONDS: i64 = 1_000_000;
49const NANOSECONDS: i64 = 1_000_000_000;
51
52const MILLISECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MILLISECONDS;
54const MICROSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MICROSECONDS;
56const NANOSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * NANOSECONDS;
58
59impl Int96 {
60 pub fn new() -> Self {
62 Self { value: [0; 3] }
63 }
64
65 #[inline]
67 pub fn data(&self) -> &[u32] {
68 &self.value
69 }
70
71 #[inline]
73 pub fn set_data(&mut self, elem0: u32, elem1: u32, elem2: u32) {
74 self.value = [elem0, elem1, elem2];
75 }
76
77 #[inline]
81 pub fn to_seconds(&self) -> i64 {
82 let (day, nanos) = self.data_as_days_and_nanos();
83 (day as i64 - JULIAN_DAY_OF_EPOCH)
84 .wrapping_mul(SECONDS_IN_DAY)
85 .wrapping_add(nanos / 1_000_000_000)
86 }
87
88 #[inline]
92 pub fn to_millis(&self) -> i64 {
93 let (day, nanos) = self.data_as_days_and_nanos();
94 (day as i64 - JULIAN_DAY_OF_EPOCH)
95 .wrapping_mul(MILLISECONDS_IN_DAY)
96 .wrapping_add(nanos / 1_000_000)
97 }
98
99 #[inline]
103 pub fn to_micros(&self) -> i64 {
104 let (day, nanos) = self.data_as_days_and_nanos();
105 (day as i64 - JULIAN_DAY_OF_EPOCH)
106 .wrapping_mul(MICROSECONDS_IN_DAY)
107 .wrapping_add(nanos / 1_000)
108 }
109
110 #[inline]
114 pub fn to_nanos(&self) -> i64 {
115 let (day, nanos) = self.data_as_days_and_nanos();
116 (day as i64 - JULIAN_DAY_OF_EPOCH)
117 .wrapping_mul(NANOSECONDS_IN_DAY)
118 .wrapping_add(nanos)
119 }
120
121 #[inline]
122 fn get_days(&self) -> i32 {
123 self.data()[2] as i32
124 }
125
126 #[inline]
127 fn get_nanos(&self) -> i64 {
128 ((self.data()[1] as i64) << 32) + self.data()[0] as i64
129 }
130
131 #[inline]
132 fn data_as_days_and_nanos(&self) -> (i32, i64) {
133 (self.get_days(), self.get_nanos())
134 }
135}
136
137impl PartialOrd for Int96 {
138 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
139 Some(self.cmp(other))
140 }
141}
142
143impl Ord for Int96 {
144 fn cmp(&self, other: &Self) -> Ordering {
153 match self.get_days().cmp(&other.get_days()) {
154 Ordering::Equal => self.get_nanos().cmp(&other.get_nanos()),
155 ord => ord,
156 }
157 }
158}
159impl From<Vec<u32>> for Int96 {
160 fn from(buf: Vec<u32>) -> Self {
161 assert_eq!(buf.len(), 3);
162 let mut result = Self::new();
163 result.set_data(buf[0], buf[1], buf[2]);
164 result
165 }
166}
167
168impl fmt::Display for Int96 {
169 #[cold]
170 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
171 write!(f, "{:?}", self.data())
172 }
173}
174
175#[derive(Clone, Default)]
178pub struct ByteArray {
179 data: Option<Bytes>,
180}
181
182impl std::fmt::Debug for ByteArray {
184 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185 let mut debug_struct = f.debug_struct("ByteArray");
186 match self.as_utf8() {
187 Ok(s) => debug_struct.field("data", &s),
188 Err(_) => debug_struct.field("data", &self.data),
189 };
190 debug_struct.finish()
191 }
192}
193
194impl PartialOrd for ByteArray {
195 fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
196 match (&self.data, &other.data) {
201 (None, None) => Some(Ordering::Equal),
202 (None, Some(_)) => Some(Ordering::Less),
203 (Some(_), None) => Some(Ordering::Greater),
204 (Some(self_data), Some(other_data)) => {
205 self_data.partial_cmp(&other_data)
207 }
208 }
209 }
210}
211
212impl ByteArray {
213 #[inline]
215 pub fn new() -> Self {
216 ByteArray { data: None }
217 }
218
219 #[inline]
221 pub fn len(&self) -> usize {
222 assert!(self.data.is_some());
223 self.data.as_ref().unwrap().len()
224 }
225
226 #[inline]
228 pub fn is_empty(&self) -> bool {
229 self.len() == 0
230 }
231
232 #[inline]
234 pub fn data(&self) -> &[u8] {
235 self.data
236 .as_ref()
237 .expect("set_data should have been called")
238 .as_ref()
239 }
240
241 #[inline]
243 pub fn set_data(&mut self, data: Bytes) {
244 self.data = Some(data);
245 }
246
247 #[inline]
249 pub fn slice(&self, start: usize, len: usize) -> Self {
250 Self::from(
251 self.data
252 .as_ref()
253 .expect("set_data should have been called")
254 .slice(start..start + len),
255 )
256 }
257
258 pub fn as_utf8(&self) -> Result<&str> {
260 self.data
261 .as_ref()
262 .map(|ptr| ptr.as_ref())
263 .ok_or_else(|| general_err!("Can't convert empty byte array to utf8"))
264 .and_then(|bytes| from_utf8(bytes).map_err(|e| e.into()))
265 }
266}
267
268impl From<Vec<u8>> for ByteArray {
269 fn from(buf: Vec<u8>) -> ByteArray {
270 Self {
271 data: Some(buf.into()),
272 }
273 }
274}
275
276impl<'a> From<&'a [u8]> for ByteArray {
277 fn from(b: &'a [u8]) -> ByteArray {
278 let mut v = Vec::new();
279 v.extend_from_slice(b);
280 Self {
281 data: Some(v.into()),
282 }
283 }
284}
285
286impl<'a> From<&'a str> for ByteArray {
287 fn from(s: &'a str) -> ByteArray {
288 let mut v = Vec::new();
289 v.extend_from_slice(s.as_bytes());
290 Self {
291 data: Some(v.into()),
292 }
293 }
294}
295
296impl From<Bytes> for ByteArray {
297 fn from(value: Bytes) -> Self {
298 Self { data: Some(value) }
299 }
300}
301
302impl From<f16> for ByteArray {
303 fn from(value: f16) -> Self {
304 Self::from(value.to_le_bytes().as_slice())
305 }
306}
307
308impl PartialEq for ByteArray {
309 fn eq(&self, other: &ByteArray) -> bool {
310 match (&self.data, &other.data) {
311 (Some(d1), Some(d2)) => d1.as_ref() == d2.as_ref(),
312 (None, None) => true,
313 _ => false,
314 }
315 }
316}
317
318impl fmt::Display for ByteArray {
319 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
320 write!(f, "{:?}", self.data())
321 }
322}
323
324#[repr(transparent)]
339#[derive(Clone, Debug, Default)]
340pub struct FixedLenByteArray(ByteArray);
341
342impl PartialEq for FixedLenByteArray {
343 fn eq(&self, other: &FixedLenByteArray) -> bool {
344 self.0.eq(&other.0)
345 }
346}
347
348impl PartialEq<ByteArray> for FixedLenByteArray {
349 fn eq(&self, other: &ByteArray) -> bool {
350 self.0.eq(other)
351 }
352}
353
354impl PartialEq<FixedLenByteArray> for ByteArray {
355 fn eq(&self, other: &FixedLenByteArray) -> bool {
356 self.eq(&other.0)
357 }
358}
359
360impl fmt::Display for FixedLenByteArray {
361 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
362 self.0.fmt(f)
363 }
364}
365
366impl PartialOrd for FixedLenByteArray {
367 fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> {
368 self.0.partial_cmp(&other.0)
369 }
370}
371
372impl PartialOrd<FixedLenByteArray> for ByteArray {
373 fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> {
374 self.partial_cmp(&other.0)
375 }
376}
377
378impl PartialOrd<ByteArray> for FixedLenByteArray {
379 fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
380 self.0.partial_cmp(other)
381 }
382}
383
384impl Deref for FixedLenByteArray {
385 type Target = ByteArray;
386
387 fn deref(&self) -> &Self::Target {
388 &self.0
389 }
390}
391
392impl DerefMut for FixedLenByteArray {
393 fn deref_mut(&mut self) -> &mut Self::Target {
394 &mut self.0
395 }
396}
397
398impl From<ByteArray> for FixedLenByteArray {
399 fn from(other: ByteArray) -> Self {
400 Self(other)
401 }
402}
403
404impl From<Vec<u8>> for FixedLenByteArray {
405 fn from(buf: Vec<u8>) -> FixedLenByteArray {
406 FixedLenByteArray(ByteArray::from(buf))
407 }
408}
409
410impl From<FixedLenByteArray> for ByteArray {
411 fn from(other: FixedLenByteArray) -> Self {
412 other.0
413 }
414}
415
416#[derive(Clone, Debug)]
422pub enum Decimal {
423 Int32 {
425 value: [u8; 4],
427 precision: i32,
429 scale: i32,
431 },
432 Int64 {
434 value: [u8; 8],
436 precision: i32,
438 scale: i32,
440 },
441 Bytes {
443 value: ByteArray,
445 precision: i32,
447 scale: i32,
449 },
450}
451
452impl Decimal {
453 pub fn from_i32(value: i32, precision: i32, scale: i32) -> Self {
455 let bytes = value.to_be_bytes();
456 Decimal::Int32 {
457 value: bytes,
458 precision,
459 scale,
460 }
461 }
462
463 pub fn from_i64(value: i64, precision: i32, scale: i32) -> Self {
465 let bytes = value.to_be_bytes();
466 Decimal::Int64 {
467 value: bytes,
468 precision,
469 scale,
470 }
471 }
472
473 pub fn from_bytes(value: ByteArray, precision: i32, scale: i32) -> Self {
475 Decimal::Bytes {
476 value,
477 precision,
478 scale,
479 }
480 }
481
482 pub fn data(&self) -> &[u8] {
484 match *self {
485 Decimal::Int32 { ref value, .. } => value,
486 Decimal::Int64 { ref value, .. } => value,
487 Decimal::Bytes { ref value, .. } => value.data(),
488 }
489 }
490
491 pub fn precision(&self) -> i32 {
493 match *self {
494 Decimal::Int32 { precision, .. } => precision,
495 Decimal::Int64 { precision, .. } => precision,
496 Decimal::Bytes { precision, .. } => precision,
497 }
498 }
499
500 pub fn scale(&self) -> i32 {
502 match *self {
503 Decimal::Int32 { scale, .. } => scale,
504 Decimal::Int64 { scale, .. } => scale,
505 Decimal::Bytes { scale, .. } => scale,
506 }
507 }
508}
509
510impl Default for Decimal {
511 fn default() -> Self {
512 Self::from_i32(0, 0, 0)
513 }
514}
515
516impl PartialEq for Decimal {
517 fn eq(&self, other: &Decimal) -> bool {
518 self.precision() == other.precision()
519 && self.scale() == other.scale()
520 && self.data() == other.data()
521 }
522}
523
524pub trait AsBytes {
526 fn as_bytes(&self) -> &[u8];
528}
529
530pub trait SliceAsBytes: Sized {
532 fn slice_as_bytes(self_: &[Self]) -> &[u8];
534 unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8];
540}
541
542impl AsBytes for [u8] {
543 fn as_bytes(&self) -> &[u8] {
544 self
545 }
546}
547
548macro_rules! gen_as_bytes {
549 ($source_ty:ident) => {
550 impl AsBytes for $source_ty {
551 #[allow(clippy::size_of_in_element_count)]
552 fn as_bytes(&self) -> &[u8] {
553 unsafe {
556 std::slice::from_raw_parts(
557 self as *const $source_ty as *const u8,
558 std::mem::size_of::<$source_ty>(),
559 )
560 }
561 }
562 }
563
564 impl SliceAsBytes for $source_ty {
565 #[inline]
566 #[allow(clippy::size_of_in_element_count)]
567 fn slice_as_bytes(self_: &[Self]) -> &[u8] {
568 unsafe {
571 std::slice::from_raw_parts(
572 self_.as_ptr() as *const u8,
573 std::mem::size_of_val(self_),
574 )
575 }
576 }
577
578 #[inline]
579 #[allow(clippy::size_of_in_element_count)]
580 unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8] {
581 unsafe {
585 std::slice::from_raw_parts_mut(
586 self_.as_mut_ptr() as *mut u8,
587 std::mem::size_of_val(self_),
588 )
589 }
590 }
591 }
592 };
593}
594
595gen_as_bytes!(i8);
596gen_as_bytes!(i16);
597gen_as_bytes!(i32);
598gen_as_bytes!(i64);
599gen_as_bytes!(u8);
600gen_as_bytes!(u16);
601gen_as_bytes!(u32);
602gen_as_bytes!(u64);
603gen_as_bytes!(f32);
604gen_as_bytes!(f64);
605
606macro_rules! unimplemented_slice_as_bytes {
607 ($ty: ty) => {
608 impl SliceAsBytes for $ty {
609 fn slice_as_bytes(_self: &[Self]) -> &[u8] {
610 unimplemented!()
611 }
612
613 unsafe fn slice_as_bytes_mut(_self: &mut [Self]) -> &mut [u8] {
614 unimplemented!()
615 }
616 }
617 };
618}
619
620unimplemented_slice_as_bytes!(Int96);
622unimplemented_slice_as_bytes!(bool);
623unimplemented_slice_as_bytes!(ByteArray);
624unimplemented_slice_as_bytes!(FixedLenByteArray);
625
626impl AsBytes for bool {
627 fn as_bytes(&self) -> &[u8] {
628 unsafe { std::slice::from_raw_parts(self as *const bool as *const u8, 1) }
631 }
632}
633
634impl AsBytes for Int96 {
635 fn as_bytes(&self) -> &[u8] {
636 unsafe { std::slice::from_raw_parts(self.data() as *const [u32] as *const u8, 12) }
638 }
639}
640
641impl AsBytes for ByteArray {
642 fn as_bytes(&self) -> &[u8] {
643 self.data()
644 }
645}
646
647impl AsBytes for FixedLenByteArray {
648 fn as_bytes(&self) -> &[u8] {
649 self.data()
650 }
651}
652
653impl AsBytes for Decimal {
654 fn as_bytes(&self) -> &[u8] {
655 self.data()
656 }
657}
658
659impl AsBytes for Vec<u8> {
660 fn as_bytes(&self) -> &[u8] {
661 self.as_slice()
662 }
663}
664
665impl AsBytes for &str {
666 fn as_bytes(&self) -> &[u8] {
667 (self as &str).as_bytes()
668 }
669}
670
671impl AsBytes for str {
672 fn as_bytes(&self) -> &[u8] {
673 (self as &str).as_bytes()
674 }
675}
676
677pub(crate) mod private {
678 use bytes::Bytes;
679
680 use crate::encodings::decoding::PlainDecoderDetails;
681 use crate::util::bit_util::{read_num_bytes, BitReader, BitWriter};
682
683 use super::{ParquetError, Result, SliceAsBytes};
684 use crate::basic::Type;
685 use crate::file::metadata::HeapSize;
686
687 pub trait ParquetValueType:
693 PartialEq
694 + std::fmt::Debug
695 + std::fmt::Display
696 + Default
697 + Clone
698 + super::AsBytes
699 + super::FromBytes
700 + SliceAsBytes
701 + PartialOrd
702 + Send
703 + HeapSize
704 + crate::encodings::decoding::private::GetDecoder
705 + crate::file::statistics::private::MakeStatistics
706 {
707 const PHYSICAL_TYPE: Type;
708
709 fn encode<W: std::io::Write>(
711 values: &[Self],
712 writer: &mut W,
713 bit_writer: &mut BitWriter,
714 ) -> Result<()>;
715
716 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize);
718
719 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize>;
721
722 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize>;
723
724 fn dict_encoding_size(&self) -> (usize, usize) {
726 (std::mem::size_of::<Self>(), 1)
727 }
728
729 fn variable_length_bytes(_: &[Self]) -> Option<i64> {
733 None
734 }
735
736 fn as_i64(&self) -> Result<i64> {
741 Err(general_err!("Type cannot be converted to i64"))
742 }
743
744 fn as_u64(&self) -> Result<u64> {
749 self.as_i64()
750 .map_err(|_| general_err!("Type cannot be converted to u64"))
751 .map(|x| x as u64)
752 }
753
754 fn as_any(&self) -> &dyn std::any::Any;
756
757 fn as_mut_any(&mut self) -> &mut dyn std::any::Any;
759
760 fn set_from_bytes(&mut self, _data: Bytes) {
764 unimplemented!();
765 }
766 }
767
768 impl ParquetValueType for bool {
769 const PHYSICAL_TYPE: Type = Type::BOOLEAN;
770
771 #[inline]
772 fn encode<W: std::io::Write>(
773 values: &[Self],
774 _: &mut W,
775 bit_writer: &mut BitWriter,
776 ) -> Result<()> {
777 for value in values {
778 bit_writer.put_value(*value as u64, 1)
779 }
780 Ok(())
781 }
782
783 #[inline]
784 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
785 decoder.bit_reader.replace(BitReader::new(data));
786 decoder.num_values = num_values;
787 }
788
789 #[inline]
790 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
791 let bit_reader = decoder.bit_reader.as_mut().unwrap();
792 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
793 let values_read = bit_reader.get_batch(&mut buffer[..num_values], 1);
794 decoder.num_values -= values_read;
795 Ok(values_read)
796 }
797
798 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
799 let bit_reader = decoder.bit_reader.as_mut().unwrap();
800 let num_values = std::cmp::min(num_values, decoder.num_values);
801 let values_read = bit_reader.skip(num_values, 1);
802 decoder.num_values -= values_read;
803 Ok(values_read)
804 }
805
806 #[inline]
807 fn as_i64(&self) -> Result<i64> {
808 Ok(*self as i64)
809 }
810
811 #[inline]
812 fn as_any(&self) -> &dyn std::any::Any {
813 self
814 }
815
816 #[inline]
817 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
818 self
819 }
820 }
821
822 macro_rules! impl_from_raw {
823 ($ty: ty, $physical_ty: expr, $self: ident => $as_i64: block) => {
824 impl ParquetValueType for $ty {
825 const PHYSICAL_TYPE: Type = $physical_ty;
826
827 #[inline]
828 fn encode<W: std::io::Write>(values: &[Self], writer: &mut W, _: &mut BitWriter) -> Result<()> {
829 let raw = unsafe {
831 std::slice::from_raw_parts(
832 values.as_ptr() as *const u8,
833 std::mem::size_of_val(values),
834 )
835 };
836 writer.write_all(raw)?;
837
838 Ok(())
839 }
840
841 #[inline]
842 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
843 decoder.data.replace(data);
844 decoder.start = 0;
845 decoder.num_values = num_values;
846 }
847
848 #[inline]
849 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
850 let data = decoder.data.as_ref().expect("set_data should have been called");
851 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
852 let bytes_left = data.len() - decoder.start;
853 let bytes_to_decode = std::mem::size_of::<Self>() * num_values;
854
855 if bytes_left < bytes_to_decode {
856 return Err(eof_err!("Not enough bytes to decode"));
857 }
858
859 {
860 let raw_buffer = &mut unsafe { Self::slice_as_bytes_mut(buffer) }[..bytes_to_decode];
863 raw_buffer.copy_from_slice(data.slice(
864 decoder.start..decoder.start + bytes_to_decode
865 ).as_ref());
866 };
867 decoder.start += bytes_to_decode;
868 decoder.num_values -= num_values;
869
870 Ok(num_values)
871 }
872
873 #[inline]
874 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
875 let data = decoder.data.as_ref().expect("set_data should have been called");
876 let num_values = num_values.min(decoder.num_values);
877 let bytes_left = data.len() - decoder.start;
878 let bytes_to_skip = std::mem::size_of::<Self>() * num_values;
879
880 if bytes_left < bytes_to_skip {
881 return Err(eof_err!("Not enough bytes to skip"));
882 }
883
884 decoder.start += bytes_to_skip;
885 decoder.num_values -= num_values;
886
887 Ok(num_values)
888 }
889
890 #[inline]
891 fn as_i64(&$self) -> Result<i64> {
892 $as_i64
893 }
894
895 #[inline]
896 fn as_any(&self) -> &dyn std::any::Any {
897 self
898 }
899
900 #[inline]
901 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
902 self
903 }
904 }
905 }
906 }
907
908 impl_from_raw!(i32, Type::INT32, self => { Ok(*self as i64) });
909 impl_from_raw!(i64, Type::INT64, self => { Ok(*self) });
910 impl_from_raw!(f32, Type::FLOAT, self => { Err(general_err!("Type cannot be converted to i64")) });
911 impl_from_raw!(f64, Type::DOUBLE, self => { Err(general_err!("Type cannot be converted to i64")) });
912
913 impl ParquetValueType for super::Int96 {
914 const PHYSICAL_TYPE: Type = Type::INT96;
915
916 #[inline]
917 fn encode<W: std::io::Write>(
918 values: &[Self],
919 writer: &mut W,
920 _: &mut BitWriter,
921 ) -> Result<()> {
922 for value in values {
923 let raw = SliceAsBytes::slice_as_bytes(value.data());
924 writer.write_all(raw)?;
925 }
926 Ok(())
927 }
928
929 #[inline]
930 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
931 decoder.data.replace(data);
932 decoder.start = 0;
933 decoder.num_values = num_values;
934 }
935
936 #[inline]
937 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
938 let data = decoder
940 .data
941 .as_ref()
942 .expect("set_data should have been called");
943 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
944 let bytes_left = data.len() - decoder.start;
945 let bytes_to_decode = 12 * num_values;
946
947 if bytes_left < bytes_to_decode {
948 return Err(eof_err!("Not enough bytes to decode"));
949 }
950
951 let data_range = data.slice(decoder.start..decoder.start + bytes_to_decode);
952 let bytes: &[u8] = &data_range;
953 decoder.start += bytes_to_decode;
954
955 let mut pos = 0; for item in buffer.iter_mut().take(num_values) {
957 let elem0 = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap());
958 let elem1 = u32::from_le_bytes(bytes[pos + 4..pos + 8].try_into().unwrap());
959 let elem2 = u32::from_le_bytes(bytes[pos + 8..pos + 12].try_into().unwrap());
960
961 item.set_data(elem0, elem1, elem2);
962 pos += 12;
963 }
964 decoder.num_values -= num_values;
965
966 Ok(num_values)
967 }
968
969 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
970 let data = decoder
971 .data
972 .as_ref()
973 .expect("set_data should have been called");
974 let num_values = std::cmp::min(num_values, decoder.num_values);
975 let bytes_left = data.len() - decoder.start;
976 let bytes_to_skip = 12 * num_values;
977
978 if bytes_left < bytes_to_skip {
979 return Err(eof_err!("Not enough bytes to skip"));
980 }
981 decoder.start += bytes_to_skip;
982 decoder.num_values -= num_values;
983
984 Ok(num_values)
985 }
986
987 #[inline]
988 fn as_any(&self) -> &dyn std::any::Any {
989 self
990 }
991
992 #[inline]
993 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
994 self
995 }
996 }
997
998 impl HeapSize for super::Int96 {
999 fn heap_size(&self) -> usize {
1000 0 }
1002 }
1003
1004 impl ParquetValueType for super::ByteArray {
1005 const PHYSICAL_TYPE: Type = Type::BYTE_ARRAY;
1006
1007 #[inline]
1008 fn encode<W: std::io::Write>(
1009 values: &[Self],
1010 writer: &mut W,
1011 _: &mut BitWriter,
1012 ) -> Result<()> {
1013 for value in values {
1014 let len: u32 = value.len().try_into().unwrap();
1015 writer.write_all(&len.to_ne_bytes())?;
1016 let raw = value.data();
1017 writer.write_all(raw)?;
1018 }
1019 Ok(())
1020 }
1021
1022 #[inline]
1023 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
1024 decoder.data.replace(data);
1025 decoder.start = 0;
1026 decoder.num_values = num_values;
1027 }
1028
1029 #[inline]
1030 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
1031 let data = decoder
1032 .data
1033 .as_mut()
1034 .expect("set_data should have been called");
1035 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
1036 for val_array in buffer.iter_mut().take(num_values) {
1037 let len: usize =
1038 read_num_bytes::<u32>(4, data.slice(decoder.start..).as_ref()) as usize;
1039 decoder.start += std::mem::size_of::<u32>();
1040
1041 if data.len() < decoder.start + len {
1042 return Err(eof_err!("Not enough bytes to decode"));
1043 }
1044
1045 val_array.set_data(data.slice(decoder.start..decoder.start + len));
1046 decoder.start += len;
1047 }
1048 decoder.num_values -= num_values;
1049
1050 Ok(num_values)
1051 }
1052
1053 fn variable_length_bytes(values: &[Self]) -> Option<i64> {
1054 Some(values.iter().map(|x| x.len() as i64).sum())
1055 }
1056
1057 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
1058 let data = decoder
1059 .data
1060 .as_mut()
1061 .expect("set_data should have been called");
1062 let num_values = num_values.min(decoder.num_values);
1063
1064 for _ in 0..num_values {
1065 let len: usize =
1066 read_num_bytes::<u32>(4, data.slice(decoder.start..).as_ref()) as usize;
1067 decoder.start += std::mem::size_of::<u32>() + len;
1068 }
1069 decoder.num_values -= num_values;
1070
1071 Ok(num_values)
1072 }
1073
1074 #[inline]
1075 fn dict_encoding_size(&self) -> (usize, usize) {
1076 (std::mem::size_of::<u32>(), self.len())
1077 }
1078
1079 #[inline]
1080 fn as_any(&self) -> &dyn std::any::Any {
1081 self
1082 }
1083
1084 #[inline]
1085 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
1086 self
1087 }
1088
1089 #[inline]
1090 fn set_from_bytes(&mut self, data: Bytes) {
1091 self.set_data(data);
1092 }
1093 }
1094
1095 impl HeapSize for super::ByteArray {
1096 fn heap_size(&self) -> usize {
1097 self.data.as_ref().map(|data| data.len()).unwrap_or(0)
1101 }
1102 }
1103
1104 impl ParquetValueType for super::FixedLenByteArray {
1105 const PHYSICAL_TYPE: Type = Type::FIXED_LEN_BYTE_ARRAY;
1106
1107 #[inline]
1108 fn encode<W: std::io::Write>(
1109 values: &[Self],
1110 writer: &mut W,
1111 _: &mut BitWriter,
1112 ) -> Result<()> {
1113 for value in values {
1114 let raw = value.data();
1115 writer.write_all(raw)?;
1116 }
1117 Ok(())
1118 }
1119
1120 #[inline]
1121 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
1122 decoder.data.replace(data);
1123 decoder.start = 0;
1124 decoder.num_values = num_values;
1125 }
1126
1127 #[inline]
1128 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
1129 assert!(decoder.type_length > 0);
1130
1131 let data = decoder
1132 .data
1133 .as_mut()
1134 .expect("set_data should have been called");
1135 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
1136
1137 for item in buffer.iter_mut().take(num_values) {
1138 let len = decoder.type_length as usize;
1139
1140 if data.len() < decoder.start + len {
1141 return Err(eof_err!("Not enough bytes to decode"));
1142 }
1143
1144 item.set_data(data.slice(decoder.start..decoder.start + len));
1145 decoder.start += len;
1146 }
1147 decoder.num_values -= num_values;
1148
1149 Ok(num_values)
1150 }
1151
1152 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
1153 assert!(decoder.type_length > 0);
1154
1155 let data = decoder
1156 .data
1157 .as_mut()
1158 .expect("set_data should have been called");
1159 let num_values = std::cmp::min(num_values, decoder.num_values);
1160 for _ in 0..num_values {
1161 let len = decoder.type_length as usize;
1162
1163 if data.len() < decoder.start + len {
1164 return Err(eof_err!("Not enough bytes to skip"));
1165 }
1166
1167 decoder.start += len;
1168 }
1169 decoder.num_values -= num_values;
1170
1171 Ok(num_values)
1172 }
1173
1174 #[inline]
1175 fn dict_encoding_size(&self) -> (usize, usize) {
1176 (std::mem::size_of::<u32>(), self.len())
1177 }
1178
1179 #[inline]
1180 fn as_any(&self) -> &dyn std::any::Any {
1181 self
1182 }
1183
1184 #[inline]
1185 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
1186 self
1187 }
1188
1189 #[inline]
1190 fn set_from_bytes(&mut self, data: Bytes) {
1191 self.set_data(data);
1192 }
1193 }
1194
1195 impl HeapSize for super::FixedLenByteArray {
1196 fn heap_size(&self) -> usize {
1197 self.0.heap_size()
1198 }
1199 }
1200}
1201
1202pub trait DataType: 'static + Send {
1205 type T: private::ParquetValueType;
1207
1208 fn get_physical_type() -> Type {
1210 <Self::T as private::ParquetValueType>::PHYSICAL_TYPE
1211 }
1212
1213 fn get_type_size() -> usize;
1215
1216 fn get_column_reader(column_writer: ColumnReader) -> Option<ColumnReaderImpl<Self>>
1218 where
1219 Self: Sized;
1220
1221 fn get_column_writer(column_writer: ColumnWriter<'_>) -> Option<ColumnWriterImpl<'_, Self>>
1223 where
1224 Self: Sized;
1225
1226 fn get_column_writer_ref<'a, 'b: 'a>(
1228 column_writer: &'b ColumnWriter<'a>,
1229 ) -> Option<&'b ColumnWriterImpl<'a, Self>>
1230 where
1231 Self: Sized;
1232
1233 fn get_column_writer_mut<'a, 'b: 'a>(
1235 column_writer: &'a mut ColumnWriter<'b>,
1236 ) -> Option<&'a mut ColumnWriterImpl<'b, Self>>
1237 where
1238 Self: Sized;
1239}
1240
1241macro_rules! make_type {
1242 ($name:ident, $reader_ident: ident, $writer_ident: ident, $native_ty:ty, $size:expr) => {
1243 #[doc = concat!("Parquet physical type: ", stringify!($name))]
1244 #[derive(Clone)]
1245 pub struct $name {}
1246
1247 impl DataType for $name {
1248 type T = $native_ty;
1249
1250 fn get_type_size() -> usize {
1251 $size
1252 }
1253
1254 fn get_column_reader(column_reader: ColumnReader) -> Option<ColumnReaderImpl<Self>> {
1255 match column_reader {
1256 ColumnReader::$reader_ident(w) => Some(w),
1257 _ => None,
1258 }
1259 }
1260
1261 fn get_column_writer(
1262 column_writer: ColumnWriter<'_>,
1263 ) -> Option<ColumnWriterImpl<'_, Self>> {
1264 match column_writer {
1265 ColumnWriter::$writer_ident(w) => Some(w),
1266 _ => None,
1267 }
1268 }
1269
1270 fn get_column_writer_ref<'a, 'b: 'a>(
1271 column_writer: &'a ColumnWriter<'b>,
1272 ) -> Option<&'a ColumnWriterImpl<'b, Self>> {
1273 match column_writer {
1274 ColumnWriter::$writer_ident(w) => Some(w),
1275 _ => None,
1276 }
1277 }
1278
1279 fn get_column_writer_mut<'a, 'b: 'a>(
1280 column_writer: &'a mut ColumnWriter<'b>,
1281 ) -> Option<&'a mut ColumnWriterImpl<'b, Self>> {
1282 match column_writer {
1283 ColumnWriter::$writer_ident(w) => Some(w),
1284 _ => None,
1285 }
1286 }
1287 }
1288 };
1289}
1290
1291make_type!(BoolType, BoolColumnReader, BoolColumnWriter, bool, 1);
1294make_type!(Int32Type, Int32ColumnReader, Int32ColumnWriter, i32, 4);
1295make_type!(Int64Type, Int64ColumnReader, Int64ColumnWriter, i64, 8);
1296make_type!(
1297 Int96Type,
1298 Int96ColumnReader,
1299 Int96ColumnWriter,
1300 Int96,
1301 mem::size_of::<Int96>()
1302);
1303make_type!(FloatType, FloatColumnReader, FloatColumnWriter, f32, 4);
1304make_type!(DoubleType, DoubleColumnReader, DoubleColumnWriter, f64, 8);
1305make_type!(
1306 ByteArrayType,
1307 ByteArrayColumnReader,
1308 ByteArrayColumnWriter,
1309 ByteArray,
1310 mem::size_of::<ByteArray>()
1311);
1312make_type!(
1313 FixedLenByteArrayType,
1314 FixedLenByteArrayColumnReader,
1315 FixedLenByteArrayColumnWriter,
1316 FixedLenByteArray,
1317 mem::size_of::<FixedLenByteArray>()
1318);
1319
1320impl AsRef<[u8]> for ByteArray {
1321 fn as_ref(&self) -> &[u8] {
1322 self.as_bytes()
1323 }
1324}
1325
1326impl AsRef<[u8]> for FixedLenByteArray {
1327 fn as_ref(&self) -> &[u8] {
1328 self.as_bytes()
1329 }
1330}
1331
1332macro_rules! ensure_phys_ty {
1334 ($($ty:pat_param)|+ , $err: literal) => {
1335 match T::get_physical_type() {
1336 $($ty => (),)*
1337 _ => panic!($err),
1338 };
1339 }
1340}
1341
1342#[cfg(test)]
1343mod tests {
1344 use super::*;
1345
1346 #[test]
1347 fn test_as_bytes() {
1348 let i96 = Int96::from(vec![1, 2, 3]);
1350 assert_eq!(i96.as_bytes(), &[1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0]);
1351
1352 let ba = ByteArray::from(vec![1, 2, 3]);
1354 assert_eq!(ba.as_bytes(), &[1, 2, 3]);
1355
1356 let decimal = Decimal::from_i32(123, 5, 2);
1358 assert_eq!(decimal.as_bytes(), &[0, 0, 0, 123]);
1359 let decimal = Decimal::from_i64(123, 5, 2);
1360 assert_eq!(decimal.as_bytes(), &[0, 0, 0, 0, 0, 0, 0, 123]);
1361 let decimal = Decimal::from_bytes(ByteArray::from(vec![1, 2, 3]), 5, 2);
1362 assert_eq!(decimal.as_bytes(), &[1, 2, 3]);
1363 }
1364
1365 #[test]
1366 fn test_int96_from() {
1367 assert_eq!(
1368 Int96::from(vec![1, 12345, 1234567890]).data(),
1369 &[1, 12345, 1234567890]
1370 );
1371 }
1372
1373 #[test]
1374 fn test_byte_array_from() {
1375 assert_eq!(ByteArray::from(b"ABC".to_vec()).data(), b"ABC");
1376 assert_eq!(ByteArray::from("ABC").data(), b"ABC");
1377 assert_eq!(
1378 ByteArray::from(Bytes::from(vec![1u8, 2u8, 3u8, 4u8, 5u8])).data(),
1379 &[1u8, 2u8, 3u8, 4u8, 5u8]
1380 );
1381 let buf = vec![6u8, 7u8, 8u8, 9u8, 10u8];
1382 assert_eq!(ByteArray::from(buf).data(), &[6u8, 7u8, 8u8, 9u8, 10u8]);
1383 }
1384
1385 #[test]
1386 fn test_decimal_partial_eq() {
1387 assert_eq!(Decimal::default(), Decimal::from_i32(0, 0, 0));
1388 assert_eq!(Decimal::from_i32(222, 5, 2), Decimal::from_i32(222, 5, 2));
1389 assert_eq!(
1390 Decimal::from_bytes(ByteArray::from(vec![0, 0, 0, 3]), 5, 2),
1391 Decimal::from_i32(3, 5, 2)
1392 );
1393
1394 assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(111, 5, 2));
1395 assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 6, 2));
1396 assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 5, 3));
1397
1398 assert!(Decimal::from_i64(222, 5, 2) != Decimal::from_i32(222, 5, 2));
1399 }
1400
1401 #[test]
1402 fn test_byte_array_ord() {
1403 let ba1 = ByteArray::from(vec![1, 2, 3]);
1404 let ba11 = ByteArray::from(vec![1, 2, 3]);
1405 let ba2 = ByteArray::from(vec![3, 4]);
1406 let ba3 = ByteArray::from(vec![1, 2, 4]);
1407 let ba4 = ByteArray::from(vec![]);
1408 let ba5 = ByteArray::from(vec![2, 2, 3]);
1409
1410 assert!(ba1 < ba2);
1411 assert!(ba3 > ba1);
1412 assert!(ba1 > ba4);
1413 assert_eq!(ba1, ba11);
1414 assert!(ba5 > ba1);
1415 }
1416}