1use bytes::Bytes;
21use half::f16;
22use std::cmp::Ordering;
23use std::fmt;
24use std::mem;
25use std::ops::{Deref, DerefMut};
26use std::str::from_utf8;
27
28use crate::basic::Type;
29use crate::column::reader::{ColumnReader, ColumnReaderImpl};
30use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
31use crate::errors::{ParquetError, Result};
32use crate::util::bit_util::FromBytes;
33
34#[derive(Clone, Copy, Debug, PartialOrd, Default, PartialEq, Eq)]
37pub struct Int96 {
38 value: [u32; 3],
39}
40
41const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
42
43const SECONDS_IN_DAY: i64 = 86_400;
45const MILLISECONDS: i64 = 1_000;
47const MICROSECONDS: i64 = 1_000_000;
49const NANOSECONDS: i64 = 1_000_000_000;
51
52const MILLISECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MILLISECONDS;
54const MICROSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * MICROSECONDS;
56const NANOSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * NANOSECONDS;
58
59impl Int96 {
60 pub fn new() -> Self {
62 Self { value: [0; 3] }
63 }
64
65 #[inline]
67 pub fn data(&self) -> &[u32] {
68 &self.value
69 }
70
71 #[inline]
73 pub fn set_data(&mut self, elem0: u32, elem1: u32, elem2: u32) {
74 self.value = [elem0, elem1, elem2];
75 }
76
77 #[inline]
81 pub fn to_seconds(&self) -> i64 {
82 let (day, nanos) = self.data_as_days_and_nanos();
83 (day as i64 - JULIAN_DAY_OF_EPOCH)
84 .wrapping_mul(SECONDS_IN_DAY)
85 .wrapping_add(nanos / 1_000_000_000)
86 }
87
88 #[inline]
92 pub fn to_millis(&self) -> i64 {
93 let (day, nanos) = self.data_as_days_and_nanos();
94 (day as i64 - JULIAN_DAY_OF_EPOCH)
95 .wrapping_mul(MILLISECONDS_IN_DAY)
96 .wrapping_add(nanos / 1_000_000)
97 }
98
99 #[inline]
103 pub fn to_micros(&self) -> i64 {
104 let (day, nanos) = self.data_as_days_and_nanos();
105 (day as i64 - JULIAN_DAY_OF_EPOCH)
106 .wrapping_mul(MICROSECONDS_IN_DAY)
107 .wrapping_add(nanos / 1_000)
108 }
109
110 #[inline]
114 pub fn to_nanos(&self) -> i64 {
115 let (day, nanos) = self.data_as_days_and_nanos();
116 (day as i64 - JULIAN_DAY_OF_EPOCH)
117 .wrapping_mul(NANOSECONDS_IN_DAY)
118 .wrapping_add(nanos)
119 }
120
121 #[inline]
122 fn data_as_days_and_nanos(&self) -> (i32, i64) {
123 let day = self.data()[2] as i32;
124 let nanos = ((self.data()[1] as i64) << 32) + self.data()[0] as i64;
125 (day, nanos)
126 }
127}
128
129impl From<Vec<u32>> for Int96 {
130 fn from(buf: Vec<u32>) -> Self {
131 assert_eq!(buf.len(), 3);
132 let mut result = Self::new();
133 result.set_data(buf[0], buf[1], buf[2]);
134 result
135 }
136}
137
138impl fmt::Display for Int96 {
139 #[cold]
140 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
141 write!(f, "{:?}", self.data())
142 }
143}
144
145#[derive(Clone, Default)]
148pub struct ByteArray {
149 data: Option<Bytes>,
150}
151
152impl std::fmt::Debug for ByteArray {
154 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
155 let mut debug_struct = f.debug_struct("ByteArray");
156 match self.as_utf8() {
157 Ok(s) => debug_struct.field("data", &s),
158 Err(_) => debug_struct.field("data", &self.data),
159 };
160 debug_struct.finish()
161 }
162}
163
164impl PartialOrd for ByteArray {
165 fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
166 match (&self.data, &other.data) {
171 (None, None) => Some(Ordering::Equal),
172 (None, Some(_)) => Some(Ordering::Less),
173 (Some(_), None) => Some(Ordering::Greater),
174 (Some(self_data), Some(other_data)) => {
175 self_data.partial_cmp(&other_data)
177 }
178 }
179 }
180}
181
182impl ByteArray {
183 #[inline]
185 pub fn new() -> Self {
186 ByteArray { data: None }
187 }
188
189 #[inline]
191 pub fn len(&self) -> usize {
192 assert!(self.data.is_some());
193 self.data.as_ref().unwrap().len()
194 }
195
196 #[inline]
198 pub fn is_empty(&self) -> bool {
199 self.len() == 0
200 }
201
202 #[inline]
204 pub fn data(&self) -> &[u8] {
205 self.data
206 .as_ref()
207 .expect("set_data should have been called")
208 .as_ref()
209 }
210
211 #[inline]
213 pub fn set_data(&mut self, data: Bytes) {
214 self.data = Some(data);
215 }
216
217 #[inline]
219 pub fn slice(&self, start: usize, len: usize) -> Self {
220 Self::from(
221 self.data
222 .as_ref()
223 .expect("set_data should have been called")
224 .slice(start..start + len),
225 )
226 }
227
228 pub fn as_utf8(&self) -> Result<&str> {
230 self.data
231 .as_ref()
232 .map(|ptr| ptr.as_ref())
233 .ok_or_else(|| general_err!("Can't convert empty byte array to utf8"))
234 .and_then(|bytes| from_utf8(bytes).map_err(|e| e.into()))
235 }
236}
237
238impl From<Vec<u8>> for ByteArray {
239 fn from(buf: Vec<u8>) -> ByteArray {
240 Self {
241 data: Some(buf.into()),
242 }
243 }
244}
245
246impl<'a> From<&'a [u8]> for ByteArray {
247 fn from(b: &'a [u8]) -> ByteArray {
248 let mut v = Vec::new();
249 v.extend_from_slice(b);
250 Self {
251 data: Some(v.into()),
252 }
253 }
254}
255
256impl<'a> From<&'a str> for ByteArray {
257 fn from(s: &'a str) -> ByteArray {
258 let mut v = Vec::new();
259 v.extend_from_slice(s.as_bytes());
260 Self {
261 data: Some(v.into()),
262 }
263 }
264}
265
266impl From<Bytes> for ByteArray {
267 fn from(value: Bytes) -> Self {
268 Self { data: Some(value) }
269 }
270}
271
272impl From<f16> for ByteArray {
273 fn from(value: f16) -> Self {
274 Self::from(value.to_le_bytes().as_slice())
275 }
276}
277
278impl PartialEq for ByteArray {
279 fn eq(&self, other: &ByteArray) -> bool {
280 match (&self.data, &other.data) {
281 (Some(d1), Some(d2)) => d1.as_ref() == d2.as_ref(),
282 (None, None) => true,
283 _ => false,
284 }
285 }
286}
287
288impl fmt::Display for ByteArray {
289 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
290 write!(f, "{:?}", self.data())
291 }
292}
293
294#[repr(transparent)]
309#[derive(Clone, Debug, Default)]
310pub struct FixedLenByteArray(ByteArray);
311
312impl PartialEq for FixedLenByteArray {
313 fn eq(&self, other: &FixedLenByteArray) -> bool {
314 self.0.eq(&other.0)
315 }
316}
317
318impl PartialEq<ByteArray> for FixedLenByteArray {
319 fn eq(&self, other: &ByteArray) -> bool {
320 self.0.eq(other)
321 }
322}
323
324impl PartialEq<FixedLenByteArray> for ByteArray {
325 fn eq(&self, other: &FixedLenByteArray) -> bool {
326 self.eq(&other.0)
327 }
328}
329
330impl fmt::Display for FixedLenByteArray {
331 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
332 self.0.fmt(f)
333 }
334}
335
336impl PartialOrd for FixedLenByteArray {
337 fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> {
338 self.0.partial_cmp(&other.0)
339 }
340}
341
342impl PartialOrd<FixedLenByteArray> for ByteArray {
343 fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> {
344 self.partial_cmp(&other.0)
345 }
346}
347
348impl PartialOrd<ByteArray> for FixedLenByteArray {
349 fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
350 self.0.partial_cmp(other)
351 }
352}
353
354impl Deref for FixedLenByteArray {
355 type Target = ByteArray;
356
357 fn deref(&self) -> &Self::Target {
358 &self.0
359 }
360}
361
362impl DerefMut for FixedLenByteArray {
363 fn deref_mut(&mut self) -> &mut Self::Target {
364 &mut self.0
365 }
366}
367
368impl From<ByteArray> for FixedLenByteArray {
369 fn from(other: ByteArray) -> Self {
370 Self(other)
371 }
372}
373
374impl From<Vec<u8>> for FixedLenByteArray {
375 fn from(buf: Vec<u8>) -> FixedLenByteArray {
376 FixedLenByteArray(ByteArray::from(buf))
377 }
378}
379
380impl From<FixedLenByteArray> for ByteArray {
381 fn from(other: FixedLenByteArray) -> Self {
382 other.0
383 }
384}
385
386#[derive(Clone, Debug)]
392pub enum Decimal {
393 Int32 {
395 value: [u8; 4],
397 precision: i32,
399 scale: i32,
401 },
402 Int64 {
404 value: [u8; 8],
406 precision: i32,
408 scale: i32,
410 },
411 Bytes {
413 value: ByteArray,
415 precision: i32,
417 scale: i32,
419 },
420}
421
422impl Decimal {
423 pub fn from_i32(value: i32, precision: i32, scale: i32) -> Self {
425 let bytes = value.to_be_bytes();
426 Decimal::Int32 {
427 value: bytes,
428 precision,
429 scale,
430 }
431 }
432
433 pub fn from_i64(value: i64, precision: i32, scale: i32) -> Self {
435 let bytes = value.to_be_bytes();
436 Decimal::Int64 {
437 value: bytes,
438 precision,
439 scale,
440 }
441 }
442
443 pub fn from_bytes(value: ByteArray, precision: i32, scale: i32) -> Self {
445 Decimal::Bytes {
446 value,
447 precision,
448 scale,
449 }
450 }
451
452 pub fn data(&self) -> &[u8] {
454 match *self {
455 Decimal::Int32 { ref value, .. } => value,
456 Decimal::Int64 { ref value, .. } => value,
457 Decimal::Bytes { ref value, .. } => value.data(),
458 }
459 }
460
461 pub fn precision(&self) -> i32 {
463 match *self {
464 Decimal::Int32 { precision, .. } => precision,
465 Decimal::Int64 { precision, .. } => precision,
466 Decimal::Bytes { precision, .. } => precision,
467 }
468 }
469
470 pub fn scale(&self) -> i32 {
472 match *self {
473 Decimal::Int32 { scale, .. } => scale,
474 Decimal::Int64 { scale, .. } => scale,
475 Decimal::Bytes { scale, .. } => scale,
476 }
477 }
478}
479
480impl Default for Decimal {
481 fn default() -> Self {
482 Self::from_i32(0, 0, 0)
483 }
484}
485
486impl PartialEq for Decimal {
487 fn eq(&self, other: &Decimal) -> bool {
488 self.precision() == other.precision()
489 && self.scale() == other.scale()
490 && self.data() == other.data()
491 }
492}
493
494pub trait AsBytes {
496 fn as_bytes(&self) -> &[u8];
498}
499
500pub trait SliceAsBytes: Sized {
502 fn slice_as_bytes(self_: &[Self]) -> &[u8];
504 unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8];
510}
511
512impl AsBytes for [u8] {
513 fn as_bytes(&self) -> &[u8] {
514 self
515 }
516}
517
518macro_rules! gen_as_bytes {
519 ($source_ty:ident) => {
520 impl AsBytes for $source_ty {
521 #[allow(clippy::size_of_in_element_count)]
522 fn as_bytes(&self) -> &[u8] {
523 unsafe {
526 std::slice::from_raw_parts(
527 self as *const $source_ty as *const u8,
528 std::mem::size_of::<$source_ty>(),
529 )
530 }
531 }
532 }
533
534 impl SliceAsBytes for $source_ty {
535 #[inline]
536 #[allow(clippy::size_of_in_element_count)]
537 fn slice_as_bytes(self_: &[Self]) -> &[u8] {
538 unsafe {
541 std::slice::from_raw_parts(
542 self_.as_ptr() as *const u8,
543 std::mem::size_of_val(self_),
544 )
545 }
546 }
547
548 #[inline]
549 #[allow(clippy::size_of_in_element_count)]
550 unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8] {
551 unsafe {
555 std::slice::from_raw_parts_mut(
556 self_.as_mut_ptr() as *mut u8,
557 std::mem::size_of_val(self_),
558 )
559 }
560 }
561 }
562 };
563}
564
565gen_as_bytes!(i8);
566gen_as_bytes!(i16);
567gen_as_bytes!(i32);
568gen_as_bytes!(i64);
569gen_as_bytes!(u8);
570gen_as_bytes!(u16);
571gen_as_bytes!(u32);
572gen_as_bytes!(u64);
573gen_as_bytes!(f32);
574gen_as_bytes!(f64);
575
576macro_rules! unimplemented_slice_as_bytes {
577 ($ty: ty) => {
578 impl SliceAsBytes for $ty {
579 fn slice_as_bytes(_self: &[Self]) -> &[u8] {
580 unimplemented!()
581 }
582
583 unsafe fn slice_as_bytes_mut(_self: &mut [Self]) -> &mut [u8] {
584 unimplemented!()
585 }
586 }
587 };
588}
589
590unimplemented_slice_as_bytes!(Int96);
592unimplemented_slice_as_bytes!(bool);
593unimplemented_slice_as_bytes!(ByteArray);
594unimplemented_slice_as_bytes!(FixedLenByteArray);
595
596impl AsBytes for bool {
597 fn as_bytes(&self) -> &[u8] {
598 unsafe { std::slice::from_raw_parts(self as *const bool as *const u8, 1) }
601 }
602}
603
604impl AsBytes for Int96 {
605 fn as_bytes(&self) -> &[u8] {
606 unsafe { std::slice::from_raw_parts(self.data() as *const [u32] as *const u8, 12) }
608 }
609}
610
611impl AsBytes for ByteArray {
612 fn as_bytes(&self) -> &[u8] {
613 self.data()
614 }
615}
616
617impl AsBytes for FixedLenByteArray {
618 fn as_bytes(&self) -> &[u8] {
619 self.data()
620 }
621}
622
623impl AsBytes for Decimal {
624 fn as_bytes(&self) -> &[u8] {
625 self.data()
626 }
627}
628
629impl AsBytes for Vec<u8> {
630 fn as_bytes(&self) -> &[u8] {
631 self.as_slice()
632 }
633}
634
635impl AsBytes for &str {
636 fn as_bytes(&self) -> &[u8] {
637 (self as &str).as_bytes()
638 }
639}
640
641impl AsBytes for str {
642 fn as_bytes(&self) -> &[u8] {
643 (self as &str).as_bytes()
644 }
645}
646
647pub(crate) mod private {
648 use bytes::Bytes;
649
650 use crate::encodings::decoding::PlainDecoderDetails;
651 use crate::util::bit_util::{read_num_bytes, BitReader, BitWriter};
652
653 use super::{ParquetError, Result, SliceAsBytes};
654 use crate::basic::Type;
655 use crate::file::metadata::HeapSize;
656
657 pub trait ParquetValueType:
663 PartialEq
664 + std::fmt::Debug
665 + std::fmt::Display
666 + Default
667 + Clone
668 + super::AsBytes
669 + super::FromBytes
670 + SliceAsBytes
671 + PartialOrd
672 + Send
673 + HeapSize
674 + crate::encodings::decoding::private::GetDecoder
675 + crate::file::statistics::private::MakeStatistics
676 {
677 const PHYSICAL_TYPE: Type;
678
679 fn encode<W: std::io::Write>(
681 values: &[Self],
682 writer: &mut W,
683 bit_writer: &mut BitWriter,
684 ) -> Result<()>;
685
686 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize);
688
689 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize>;
691
692 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize>;
693
694 fn dict_encoding_size(&self) -> (usize, usize) {
696 (std::mem::size_of::<Self>(), 1)
697 }
698
699 fn variable_length_bytes(_: &[Self]) -> Option<i64> {
703 None
704 }
705
706 fn as_i64(&self) -> Result<i64> {
711 Err(general_err!("Type cannot be converted to i64"))
712 }
713
714 fn as_u64(&self) -> Result<u64> {
719 self.as_i64()
720 .map_err(|_| general_err!("Type cannot be converted to u64"))
721 .map(|x| x as u64)
722 }
723
724 fn as_any(&self) -> &dyn std::any::Any;
726
727 fn as_mut_any(&mut self) -> &mut dyn std::any::Any;
729
730 fn set_from_bytes(&mut self, _data: Bytes) {
734 unimplemented!();
735 }
736 }
737
738 impl ParquetValueType for bool {
739 const PHYSICAL_TYPE: Type = Type::BOOLEAN;
740
741 #[inline]
742 fn encode<W: std::io::Write>(
743 values: &[Self],
744 _: &mut W,
745 bit_writer: &mut BitWriter,
746 ) -> Result<()> {
747 for value in values {
748 bit_writer.put_value(*value as u64, 1)
749 }
750 Ok(())
751 }
752
753 #[inline]
754 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
755 decoder.bit_reader.replace(BitReader::new(data));
756 decoder.num_values = num_values;
757 }
758
759 #[inline]
760 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
761 let bit_reader = decoder.bit_reader.as_mut().unwrap();
762 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
763 let values_read = bit_reader.get_batch(&mut buffer[..num_values], 1);
764 decoder.num_values -= values_read;
765 Ok(values_read)
766 }
767
768 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
769 let bit_reader = decoder.bit_reader.as_mut().unwrap();
770 let num_values = std::cmp::min(num_values, decoder.num_values);
771 let values_read = bit_reader.skip(num_values, 1);
772 decoder.num_values -= values_read;
773 Ok(values_read)
774 }
775
776 #[inline]
777 fn as_i64(&self) -> Result<i64> {
778 Ok(*self as i64)
779 }
780
781 #[inline]
782 fn as_any(&self) -> &dyn std::any::Any {
783 self
784 }
785
786 #[inline]
787 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
788 self
789 }
790 }
791
792 macro_rules! impl_from_raw {
793 ($ty: ty, $physical_ty: expr, $self: ident => $as_i64: block) => {
794 impl ParquetValueType for $ty {
795 const PHYSICAL_TYPE: Type = $physical_ty;
796
797 #[inline]
798 fn encode<W: std::io::Write>(values: &[Self], writer: &mut W, _: &mut BitWriter) -> Result<()> {
799 let raw = unsafe {
801 std::slice::from_raw_parts(
802 values.as_ptr() as *const u8,
803 std::mem::size_of_val(values),
804 )
805 };
806 writer.write_all(raw)?;
807
808 Ok(())
809 }
810
811 #[inline]
812 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
813 decoder.data.replace(data);
814 decoder.start = 0;
815 decoder.num_values = num_values;
816 }
817
818 #[inline]
819 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
820 let data = decoder.data.as_ref().expect("set_data should have been called");
821 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
822 let bytes_left = data.len() - decoder.start;
823 let bytes_to_decode = std::mem::size_of::<Self>() * num_values;
824
825 if bytes_left < bytes_to_decode {
826 return Err(eof_err!("Not enough bytes to decode"));
827 }
828
829 {
830 let raw_buffer = &mut unsafe { Self::slice_as_bytes_mut(buffer) }[..bytes_to_decode];
833 raw_buffer.copy_from_slice(data.slice(
834 decoder.start..decoder.start + bytes_to_decode
835 ).as_ref());
836 };
837 decoder.start += bytes_to_decode;
838 decoder.num_values -= num_values;
839
840 Ok(num_values)
841 }
842
843 #[inline]
844 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
845 let data = decoder.data.as_ref().expect("set_data should have been called");
846 let num_values = num_values.min(decoder.num_values);
847 let bytes_left = data.len() - decoder.start;
848 let bytes_to_skip = std::mem::size_of::<Self>() * num_values;
849
850 if bytes_left < bytes_to_skip {
851 return Err(eof_err!("Not enough bytes to skip"));
852 }
853
854 decoder.start += bytes_to_skip;
855 decoder.num_values -= num_values;
856
857 Ok(num_values)
858 }
859
860 #[inline]
861 fn as_i64(&$self) -> Result<i64> {
862 $as_i64
863 }
864
865 #[inline]
866 fn as_any(&self) -> &dyn std::any::Any {
867 self
868 }
869
870 #[inline]
871 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
872 self
873 }
874 }
875 }
876 }
877
878 impl_from_raw!(i32, Type::INT32, self => { Ok(*self as i64) });
879 impl_from_raw!(i64, Type::INT64, self => { Ok(*self) });
880 impl_from_raw!(f32, Type::FLOAT, self => { Err(general_err!("Type cannot be converted to i64")) });
881 impl_from_raw!(f64, Type::DOUBLE, self => { Err(general_err!("Type cannot be converted to i64")) });
882
883 impl ParquetValueType for super::Int96 {
884 const PHYSICAL_TYPE: Type = Type::INT96;
885
886 #[inline]
887 fn encode<W: std::io::Write>(
888 values: &[Self],
889 writer: &mut W,
890 _: &mut BitWriter,
891 ) -> Result<()> {
892 for value in values {
893 let raw = SliceAsBytes::slice_as_bytes(value.data());
894 writer.write_all(raw)?;
895 }
896 Ok(())
897 }
898
899 #[inline]
900 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
901 decoder.data.replace(data);
902 decoder.start = 0;
903 decoder.num_values = num_values;
904 }
905
906 #[inline]
907 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
908 let data = decoder
910 .data
911 .as_ref()
912 .expect("set_data should have been called");
913 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
914 let bytes_left = data.len() - decoder.start;
915 let bytes_to_decode = 12 * num_values;
916
917 if bytes_left < bytes_to_decode {
918 return Err(eof_err!("Not enough bytes to decode"));
919 }
920
921 let data_range = data.slice(decoder.start..decoder.start + bytes_to_decode);
922 let bytes: &[u8] = &data_range;
923 decoder.start += bytes_to_decode;
924
925 let mut pos = 0; for item in buffer.iter_mut().take(num_values) {
927 let elem0 = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap());
928 let elem1 = u32::from_le_bytes(bytes[pos + 4..pos + 8].try_into().unwrap());
929 let elem2 = u32::from_le_bytes(bytes[pos + 8..pos + 12].try_into().unwrap());
930
931 item.set_data(elem0, elem1, elem2);
932 pos += 12;
933 }
934 decoder.num_values -= num_values;
935
936 Ok(num_values)
937 }
938
939 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
940 let data = decoder
941 .data
942 .as_ref()
943 .expect("set_data should have been called");
944 let num_values = std::cmp::min(num_values, decoder.num_values);
945 let bytes_left = data.len() - decoder.start;
946 let bytes_to_skip = 12 * num_values;
947
948 if bytes_left < bytes_to_skip {
949 return Err(eof_err!("Not enough bytes to skip"));
950 }
951 decoder.start += bytes_to_skip;
952 decoder.num_values -= num_values;
953
954 Ok(num_values)
955 }
956
957 #[inline]
958 fn as_any(&self) -> &dyn std::any::Any {
959 self
960 }
961
962 #[inline]
963 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
964 self
965 }
966 }
967
968 impl HeapSize for super::Int96 {
969 fn heap_size(&self) -> usize {
970 0 }
972 }
973
974 impl ParquetValueType for super::ByteArray {
975 const PHYSICAL_TYPE: Type = Type::BYTE_ARRAY;
976
977 #[inline]
978 fn encode<W: std::io::Write>(
979 values: &[Self],
980 writer: &mut W,
981 _: &mut BitWriter,
982 ) -> Result<()> {
983 for value in values {
984 let len: u32 = value.len().try_into().unwrap();
985 writer.write_all(&len.to_ne_bytes())?;
986 let raw = value.data();
987 writer.write_all(raw)?;
988 }
989 Ok(())
990 }
991
992 #[inline]
993 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
994 decoder.data.replace(data);
995 decoder.start = 0;
996 decoder.num_values = num_values;
997 }
998
999 #[inline]
1000 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
1001 let data = decoder
1002 .data
1003 .as_mut()
1004 .expect("set_data should have been called");
1005 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
1006 for val_array in buffer.iter_mut().take(num_values) {
1007 let len: usize =
1008 read_num_bytes::<u32>(4, data.slice(decoder.start..).as_ref()) as usize;
1009 decoder.start += std::mem::size_of::<u32>();
1010
1011 if data.len() < decoder.start + len {
1012 return Err(eof_err!("Not enough bytes to decode"));
1013 }
1014
1015 val_array.set_data(data.slice(decoder.start..decoder.start + len));
1016 decoder.start += len;
1017 }
1018 decoder.num_values -= num_values;
1019
1020 Ok(num_values)
1021 }
1022
1023 fn variable_length_bytes(values: &[Self]) -> Option<i64> {
1024 Some(values.iter().map(|x| x.len() as i64).sum())
1025 }
1026
1027 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
1028 let data = decoder
1029 .data
1030 .as_mut()
1031 .expect("set_data should have been called");
1032 let num_values = num_values.min(decoder.num_values);
1033
1034 for _ in 0..num_values {
1035 let len: usize =
1036 read_num_bytes::<u32>(4, data.slice(decoder.start..).as_ref()) as usize;
1037 decoder.start += std::mem::size_of::<u32>() + len;
1038 }
1039 decoder.num_values -= num_values;
1040
1041 Ok(num_values)
1042 }
1043
1044 #[inline]
1045 fn dict_encoding_size(&self) -> (usize, usize) {
1046 (std::mem::size_of::<u32>(), self.len())
1047 }
1048
1049 #[inline]
1050 fn as_any(&self) -> &dyn std::any::Any {
1051 self
1052 }
1053
1054 #[inline]
1055 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
1056 self
1057 }
1058
1059 #[inline]
1060 fn set_from_bytes(&mut self, data: Bytes) {
1061 self.set_data(data);
1062 }
1063 }
1064
1065 impl HeapSize for super::ByteArray {
1066 fn heap_size(&self) -> usize {
1067 self.data.as_ref().map(|data| data.len()).unwrap_or(0)
1071 }
1072 }
1073
1074 impl ParquetValueType for super::FixedLenByteArray {
1075 const PHYSICAL_TYPE: Type = Type::FIXED_LEN_BYTE_ARRAY;
1076
1077 #[inline]
1078 fn encode<W: std::io::Write>(
1079 values: &[Self],
1080 writer: &mut W,
1081 _: &mut BitWriter,
1082 ) -> Result<()> {
1083 for value in values {
1084 let raw = value.data();
1085 writer.write_all(raw)?;
1086 }
1087 Ok(())
1088 }
1089
1090 #[inline]
1091 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
1092 decoder.data.replace(data);
1093 decoder.start = 0;
1094 decoder.num_values = num_values;
1095 }
1096
1097 #[inline]
1098 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
1099 assert!(decoder.type_length > 0);
1100
1101 let data = decoder
1102 .data
1103 .as_mut()
1104 .expect("set_data should have been called");
1105 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
1106
1107 for item in buffer.iter_mut().take(num_values) {
1108 let len = decoder.type_length as usize;
1109
1110 if data.len() < decoder.start + len {
1111 return Err(eof_err!("Not enough bytes to decode"));
1112 }
1113
1114 item.set_data(data.slice(decoder.start..decoder.start + len));
1115 decoder.start += len;
1116 }
1117 decoder.num_values -= num_values;
1118
1119 Ok(num_values)
1120 }
1121
1122 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
1123 assert!(decoder.type_length > 0);
1124
1125 let data = decoder
1126 .data
1127 .as_mut()
1128 .expect("set_data should have been called");
1129 let num_values = std::cmp::min(num_values, decoder.num_values);
1130 for _ in 0..num_values {
1131 let len = decoder.type_length as usize;
1132
1133 if data.len() < decoder.start + len {
1134 return Err(eof_err!("Not enough bytes to skip"));
1135 }
1136
1137 decoder.start += len;
1138 }
1139 decoder.num_values -= num_values;
1140
1141 Ok(num_values)
1142 }
1143
1144 #[inline]
1145 fn dict_encoding_size(&self) -> (usize, usize) {
1146 (std::mem::size_of::<u32>(), self.len())
1147 }
1148
1149 #[inline]
1150 fn as_any(&self) -> &dyn std::any::Any {
1151 self
1152 }
1153
1154 #[inline]
1155 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
1156 self
1157 }
1158
1159 #[inline]
1160 fn set_from_bytes(&mut self, data: Bytes) {
1161 self.set_data(data);
1162 }
1163 }
1164
1165 impl HeapSize for super::FixedLenByteArray {
1166 fn heap_size(&self) -> usize {
1167 self.0.heap_size()
1168 }
1169 }
1170}
1171
1172pub trait DataType: 'static + Send {
1175 type T: private::ParquetValueType;
1177
1178 fn get_physical_type() -> Type {
1180 <Self::T as private::ParquetValueType>::PHYSICAL_TYPE
1181 }
1182
1183 fn get_type_size() -> usize;
1185
1186 fn get_column_reader(column_writer: ColumnReader) -> Option<ColumnReaderImpl<Self>>
1188 where
1189 Self: Sized;
1190
1191 fn get_column_writer(column_writer: ColumnWriter<'_>) -> Option<ColumnWriterImpl<'_, Self>>
1193 where
1194 Self: Sized;
1195
1196 fn get_column_writer_ref<'a, 'b: 'a>(
1198 column_writer: &'b ColumnWriter<'a>,
1199 ) -> Option<&'b ColumnWriterImpl<'a, Self>>
1200 where
1201 Self: Sized;
1202
1203 fn get_column_writer_mut<'a, 'b: 'a>(
1205 column_writer: &'a mut ColumnWriter<'b>,
1206 ) -> Option<&'a mut ColumnWriterImpl<'b, Self>>
1207 where
1208 Self: Sized;
1209}
1210
1211macro_rules! make_type {
1212 ($name:ident, $reader_ident: ident, $writer_ident: ident, $native_ty:ty, $size:expr) => {
1213 #[doc = concat!("Parquet physical type: ", stringify!($name))]
1214 #[derive(Clone)]
1215 pub struct $name {}
1216
1217 impl DataType for $name {
1218 type T = $native_ty;
1219
1220 fn get_type_size() -> usize {
1221 $size
1222 }
1223
1224 fn get_column_reader(column_reader: ColumnReader) -> Option<ColumnReaderImpl<Self>> {
1225 match column_reader {
1226 ColumnReader::$reader_ident(w) => Some(w),
1227 _ => None,
1228 }
1229 }
1230
1231 fn get_column_writer(
1232 column_writer: ColumnWriter<'_>,
1233 ) -> Option<ColumnWriterImpl<'_, Self>> {
1234 match column_writer {
1235 ColumnWriter::$writer_ident(w) => Some(w),
1236 _ => None,
1237 }
1238 }
1239
1240 fn get_column_writer_ref<'a, 'b: 'a>(
1241 column_writer: &'a ColumnWriter<'b>,
1242 ) -> Option<&'a ColumnWriterImpl<'b, Self>> {
1243 match column_writer {
1244 ColumnWriter::$writer_ident(w) => Some(w),
1245 _ => None,
1246 }
1247 }
1248
1249 fn get_column_writer_mut<'a, 'b: 'a>(
1250 column_writer: &'a mut ColumnWriter<'b>,
1251 ) -> Option<&'a mut ColumnWriterImpl<'b, Self>> {
1252 match column_writer {
1253 ColumnWriter::$writer_ident(w) => Some(w),
1254 _ => None,
1255 }
1256 }
1257 }
1258 };
1259}
1260
1261make_type!(BoolType, BoolColumnReader, BoolColumnWriter, bool, 1);
1264make_type!(Int32Type, Int32ColumnReader, Int32ColumnWriter, i32, 4);
1265make_type!(Int64Type, Int64ColumnReader, Int64ColumnWriter, i64, 8);
1266make_type!(
1267 Int96Type,
1268 Int96ColumnReader,
1269 Int96ColumnWriter,
1270 Int96,
1271 mem::size_of::<Int96>()
1272);
1273make_type!(FloatType, FloatColumnReader, FloatColumnWriter, f32, 4);
1274make_type!(DoubleType, DoubleColumnReader, DoubleColumnWriter, f64, 8);
1275make_type!(
1276 ByteArrayType,
1277 ByteArrayColumnReader,
1278 ByteArrayColumnWriter,
1279 ByteArray,
1280 mem::size_of::<ByteArray>()
1281);
1282make_type!(
1283 FixedLenByteArrayType,
1284 FixedLenByteArrayColumnReader,
1285 FixedLenByteArrayColumnWriter,
1286 FixedLenByteArray,
1287 mem::size_of::<FixedLenByteArray>()
1288);
1289
1290impl AsRef<[u8]> for ByteArray {
1291 fn as_ref(&self) -> &[u8] {
1292 self.as_bytes()
1293 }
1294}
1295
1296impl AsRef<[u8]> for FixedLenByteArray {
1297 fn as_ref(&self) -> &[u8] {
1298 self.as_bytes()
1299 }
1300}
1301
1302macro_rules! ensure_phys_ty {
1304 ($($ty:pat_param)|+ , $err: literal) => {
1305 match T::get_physical_type() {
1306 $($ty => (),)*
1307 _ => panic!($err),
1308 };
1309 }
1310}
1311
1312#[cfg(test)]
1313mod tests {
1314 use super::*;
1315
1316 #[test]
1317 fn test_as_bytes() {
1318 let i96 = Int96::from(vec![1, 2, 3]);
1320 assert_eq!(i96.as_bytes(), &[1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0]);
1321
1322 let ba = ByteArray::from(vec![1, 2, 3]);
1324 assert_eq!(ba.as_bytes(), &[1, 2, 3]);
1325
1326 let decimal = Decimal::from_i32(123, 5, 2);
1328 assert_eq!(decimal.as_bytes(), &[0, 0, 0, 123]);
1329 let decimal = Decimal::from_i64(123, 5, 2);
1330 assert_eq!(decimal.as_bytes(), &[0, 0, 0, 0, 0, 0, 0, 123]);
1331 let decimal = Decimal::from_bytes(ByteArray::from(vec![1, 2, 3]), 5, 2);
1332 assert_eq!(decimal.as_bytes(), &[1, 2, 3]);
1333 }
1334
1335 #[test]
1336 fn test_int96_from() {
1337 assert_eq!(
1338 Int96::from(vec![1, 12345, 1234567890]).data(),
1339 &[1, 12345, 1234567890]
1340 );
1341 }
1342
1343 #[test]
1344 fn test_byte_array_from() {
1345 assert_eq!(ByteArray::from(b"ABC".to_vec()).data(), b"ABC");
1346 assert_eq!(ByteArray::from("ABC").data(), b"ABC");
1347 assert_eq!(
1348 ByteArray::from(Bytes::from(vec![1u8, 2u8, 3u8, 4u8, 5u8])).data(),
1349 &[1u8, 2u8, 3u8, 4u8, 5u8]
1350 );
1351 let buf = vec![6u8, 7u8, 8u8, 9u8, 10u8];
1352 assert_eq!(ByteArray::from(buf).data(), &[6u8, 7u8, 8u8, 9u8, 10u8]);
1353 }
1354
1355 #[test]
1356 fn test_decimal_partial_eq() {
1357 assert_eq!(Decimal::default(), Decimal::from_i32(0, 0, 0));
1358 assert_eq!(Decimal::from_i32(222, 5, 2), Decimal::from_i32(222, 5, 2));
1359 assert_eq!(
1360 Decimal::from_bytes(ByteArray::from(vec![0, 0, 0, 3]), 5, 2),
1361 Decimal::from_i32(3, 5, 2)
1362 );
1363
1364 assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(111, 5, 2));
1365 assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 6, 2));
1366 assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 5, 3));
1367
1368 assert!(Decimal::from_i64(222, 5, 2) != Decimal::from_i32(222, 5, 2));
1369 }
1370
1371 #[test]
1372 fn test_byte_array_ord() {
1373 let ba1 = ByteArray::from(vec![1, 2, 3]);
1374 let ba11 = ByteArray::from(vec![1, 2, 3]);
1375 let ba2 = ByteArray::from(vec![3, 4]);
1376 let ba3 = ByteArray::from(vec![1, 2, 4]);
1377 let ba4 = ByteArray::from(vec![]);
1378 let ba5 = ByteArray::from(vec![2, 2, 3]);
1379
1380 assert!(ba1 < ba2);
1381 assert!(ba3 > ba1);
1382 assert!(ba1 > ba4);
1383 assert_eq!(ba1, ba11);
1384 assert!(ba5 > ba1);
1385 }
1386}