1use bytes::Bytes;
21use half::f16;
22use std::cmp::Ordering;
23use std::fmt;
24use std::mem;
25use std::ops::{Deref, DerefMut};
26use std::str::from_utf8;
27
28use crate::basic::Type;
29use crate::column::reader::{ColumnReader, ColumnReaderImpl};
30use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
31use crate::errors::{ParquetError, Result};
32use crate::util::bit_util::FromBytes;
33
34#[derive(Clone, Copy, Debug, PartialOrd, Default, PartialEq, Eq)]
37pub struct Int96 {
38 value: [u32; 3],
39}
40
41impl Int96 {
42 pub fn new() -> Self {
44 Self { value: [0; 3] }
45 }
46
47 #[inline]
49 pub fn data(&self) -> &[u32] {
50 &self.value
51 }
52
53 #[inline]
55 pub fn set_data(&mut self, elem0: u32, elem1: u32, elem2: u32) {
56 self.value = [elem0, elem1, elem2];
57 }
58
59 pub fn to_i64(&self) -> i64 {
61 let (seconds, nanoseconds) = self.to_seconds_and_nanos();
62 seconds * 1_000 + nanoseconds / 1_000_000
63 }
64
65 pub fn to_nanos(&self) -> i64 {
69 let (seconds, nanoseconds) = self.to_seconds_and_nanos();
70 seconds
71 .wrapping_mul(1_000_000_000)
72 .wrapping_add(nanoseconds)
73 }
74
75 pub fn to_seconds_and_nanos(&self) -> (i64, i64) {
77 const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
78 const SECONDS_PER_DAY: i64 = 86_400;
79
80 let day = self.data()[2] as i64;
81 let nanoseconds = ((self.data()[1] as i64) << 32) + self.data()[0] as i64;
82 let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
83 (seconds, nanoseconds)
84 }
85}
86
87impl From<Vec<u32>> for Int96 {
88 fn from(buf: Vec<u32>) -> Self {
89 assert_eq!(buf.len(), 3);
90 let mut result = Self::new();
91 result.set_data(buf[0], buf[1], buf[2]);
92 result
93 }
94}
95
96impl fmt::Display for Int96 {
97 #[cold]
98 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
99 write!(f, "{:?}", self.data())
100 }
101}
102
103#[derive(Clone, Default)]
106pub struct ByteArray {
107 data: Option<Bytes>,
108}
109
110impl std::fmt::Debug for ByteArray {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 let mut debug_struct = f.debug_struct("ByteArray");
114 match self.as_utf8() {
115 Ok(s) => debug_struct.field("data", &s),
116 Err(_) => debug_struct.field("data", &self.data),
117 };
118 debug_struct.finish()
119 }
120}
121
122impl PartialOrd for ByteArray {
123 fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
124 match (&self.data, &other.data) {
129 (None, None) => Some(Ordering::Equal),
130 (None, Some(_)) => Some(Ordering::Less),
131 (Some(_), None) => Some(Ordering::Greater),
132 (Some(self_data), Some(other_data)) => {
133 self_data.partial_cmp(&other_data)
135 }
136 }
137 }
138}
139
140impl ByteArray {
141 #[inline]
143 pub fn new() -> Self {
144 ByteArray { data: None }
145 }
146
147 #[inline]
149 pub fn len(&self) -> usize {
150 assert!(self.data.is_some());
151 self.data.as_ref().unwrap().len()
152 }
153
154 #[inline]
156 pub fn is_empty(&self) -> bool {
157 self.len() == 0
158 }
159
160 #[inline]
162 pub fn data(&self) -> &[u8] {
163 self.data
164 .as_ref()
165 .expect("set_data should have been called")
166 .as_ref()
167 }
168
169 #[inline]
171 pub fn set_data(&mut self, data: Bytes) {
172 self.data = Some(data);
173 }
174
175 #[inline]
177 pub fn slice(&self, start: usize, len: usize) -> Self {
178 Self::from(
179 self.data
180 .as_ref()
181 .expect("set_data should have been called")
182 .slice(start..start + len),
183 )
184 }
185
186 pub fn as_utf8(&self) -> Result<&str> {
188 self.data
189 .as_ref()
190 .map(|ptr| ptr.as_ref())
191 .ok_or_else(|| general_err!("Can't convert empty byte array to utf8"))
192 .and_then(|bytes| from_utf8(bytes).map_err(|e| e.into()))
193 }
194}
195
196impl From<Vec<u8>> for ByteArray {
197 fn from(buf: Vec<u8>) -> ByteArray {
198 Self {
199 data: Some(buf.into()),
200 }
201 }
202}
203
204impl<'a> From<&'a [u8]> for ByteArray {
205 fn from(b: &'a [u8]) -> ByteArray {
206 let mut v = Vec::new();
207 v.extend_from_slice(b);
208 Self {
209 data: Some(v.into()),
210 }
211 }
212}
213
214impl<'a> From<&'a str> for ByteArray {
215 fn from(s: &'a str) -> ByteArray {
216 let mut v = Vec::new();
217 v.extend_from_slice(s.as_bytes());
218 Self {
219 data: Some(v.into()),
220 }
221 }
222}
223
224impl From<Bytes> for ByteArray {
225 fn from(value: Bytes) -> Self {
226 Self { data: Some(value) }
227 }
228}
229
230impl From<f16> for ByteArray {
231 fn from(value: f16) -> Self {
232 Self::from(value.to_le_bytes().as_slice())
233 }
234}
235
236impl PartialEq for ByteArray {
237 fn eq(&self, other: &ByteArray) -> bool {
238 match (&self.data, &other.data) {
239 (Some(d1), Some(d2)) => d1.as_ref() == d2.as_ref(),
240 (None, None) => true,
241 _ => false,
242 }
243 }
244}
245
246impl fmt::Display for ByteArray {
247 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
248 write!(f, "{:?}", self.data())
249 }
250}
251
252#[repr(transparent)]
267#[derive(Clone, Debug, Default)]
268pub struct FixedLenByteArray(ByteArray);
269
270impl PartialEq for FixedLenByteArray {
271 fn eq(&self, other: &FixedLenByteArray) -> bool {
272 self.0.eq(&other.0)
273 }
274}
275
276impl PartialEq<ByteArray> for FixedLenByteArray {
277 fn eq(&self, other: &ByteArray) -> bool {
278 self.0.eq(other)
279 }
280}
281
282impl PartialEq<FixedLenByteArray> for ByteArray {
283 fn eq(&self, other: &FixedLenByteArray) -> bool {
284 self.eq(&other.0)
285 }
286}
287
288impl fmt::Display for FixedLenByteArray {
289 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
290 self.0.fmt(f)
291 }
292}
293
294impl PartialOrd for FixedLenByteArray {
295 fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> {
296 self.0.partial_cmp(&other.0)
297 }
298}
299
300impl PartialOrd<FixedLenByteArray> for ByteArray {
301 fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> {
302 self.partial_cmp(&other.0)
303 }
304}
305
306impl PartialOrd<ByteArray> for FixedLenByteArray {
307 fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
308 self.0.partial_cmp(other)
309 }
310}
311
312impl Deref for FixedLenByteArray {
313 type Target = ByteArray;
314
315 fn deref(&self) -> &Self::Target {
316 &self.0
317 }
318}
319
320impl DerefMut for FixedLenByteArray {
321 fn deref_mut(&mut self) -> &mut Self::Target {
322 &mut self.0
323 }
324}
325
326impl From<ByteArray> for FixedLenByteArray {
327 fn from(other: ByteArray) -> Self {
328 Self(other)
329 }
330}
331
332impl From<Vec<u8>> for FixedLenByteArray {
333 fn from(buf: Vec<u8>) -> FixedLenByteArray {
334 FixedLenByteArray(ByteArray::from(buf))
335 }
336}
337
338impl From<FixedLenByteArray> for ByteArray {
339 fn from(other: FixedLenByteArray) -> Self {
340 other.0
341 }
342}
343
344#[derive(Clone, Debug)]
350pub enum Decimal {
351 Int32 {
353 value: [u8; 4],
355 precision: i32,
357 scale: i32,
359 },
360 Int64 {
362 value: [u8; 8],
364 precision: i32,
366 scale: i32,
368 },
369 Bytes {
371 value: ByteArray,
373 precision: i32,
375 scale: i32,
377 },
378}
379
380impl Decimal {
381 pub fn from_i32(value: i32, precision: i32, scale: i32) -> Self {
383 let bytes = value.to_be_bytes();
384 Decimal::Int32 {
385 value: bytes,
386 precision,
387 scale,
388 }
389 }
390
391 pub fn from_i64(value: i64, precision: i32, scale: i32) -> Self {
393 let bytes = value.to_be_bytes();
394 Decimal::Int64 {
395 value: bytes,
396 precision,
397 scale,
398 }
399 }
400
401 pub fn from_bytes(value: ByteArray, precision: i32, scale: i32) -> Self {
403 Decimal::Bytes {
404 value,
405 precision,
406 scale,
407 }
408 }
409
410 pub fn data(&self) -> &[u8] {
412 match *self {
413 Decimal::Int32 { ref value, .. } => value,
414 Decimal::Int64 { ref value, .. } => value,
415 Decimal::Bytes { ref value, .. } => value.data(),
416 }
417 }
418
419 pub fn precision(&self) -> i32 {
421 match *self {
422 Decimal::Int32 { precision, .. } => precision,
423 Decimal::Int64 { precision, .. } => precision,
424 Decimal::Bytes { precision, .. } => precision,
425 }
426 }
427
428 pub fn scale(&self) -> i32 {
430 match *self {
431 Decimal::Int32 { scale, .. } => scale,
432 Decimal::Int64 { scale, .. } => scale,
433 Decimal::Bytes { scale, .. } => scale,
434 }
435 }
436}
437
438impl Default for Decimal {
439 fn default() -> Self {
440 Self::from_i32(0, 0, 0)
441 }
442}
443
444impl PartialEq for Decimal {
445 fn eq(&self, other: &Decimal) -> bool {
446 self.precision() == other.precision()
447 && self.scale() == other.scale()
448 && self.data() == other.data()
449 }
450}
451
452pub trait AsBytes {
454 fn as_bytes(&self) -> &[u8];
456}
457
458pub trait SliceAsBytes: Sized {
460 fn slice_as_bytes(self_: &[Self]) -> &[u8];
462 unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8];
468}
469
470impl AsBytes for [u8] {
471 fn as_bytes(&self) -> &[u8] {
472 self
473 }
474}
475
476macro_rules! gen_as_bytes {
477 ($source_ty:ident) => {
478 impl AsBytes for $source_ty {
479 #[allow(clippy::size_of_in_element_count)]
480 fn as_bytes(&self) -> &[u8] {
481 unsafe {
484 std::slice::from_raw_parts(
485 self as *const $source_ty as *const u8,
486 std::mem::size_of::<$source_ty>(),
487 )
488 }
489 }
490 }
491
492 impl SliceAsBytes for $source_ty {
493 #[inline]
494 #[allow(clippy::size_of_in_element_count)]
495 fn slice_as_bytes(self_: &[Self]) -> &[u8] {
496 unsafe {
499 std::slice::from_raw_parts(
500 self_.as_ptr() as *const u8,
501 std::mem::size_of_val(self_),
502 )
503 }
504 }
505
506 #[inline]
507 #[allow(clippy::size_of_in_element_count)]
508 unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8] {
509 unsafe {
513 std::slice::from_raw_parts_mut(
514 self_.as_mut_ptr() as *mut u8,
515 std::mem::size_of_val(self_),
516 )
517 }
518 }
519 }
520 };
521}
522
523gen_as_bytes!(i8);
524gen_as_bytes!(i16);
525gen_as_bytes!(i32);
526gen_as_bytes!(i64);
527gen_as_bytes!(u8);
528gen_as_bytes!(u16);
529gen_as_bytes!(u32);
530gen_as_bytes!(u64);
531gen_as_bytes!(f32);
532gen_as_bytes!(f64);
533
534macro_rules! unimplemented_slice_as_bytes {
535 ($ty: ty) => {
536 impl SliceAsBytes for $ty {
537 fn slice_as_bytes(_self: &[Self]) -> &[u8] {
538 unimplemented!()
539 }
540
541 unsafe fn slice_as_bytes_mut(_self: &mut [Self]) -> &mut [u8] {
542 unimplemented!()
543 }
544 }
545 };
546}
547
548unimplemented_slice_as_bytes!(Int96);
550unimplemented_slice_as_bytes!(bool);
551unimplemented_slice_as_bytes!(ByteArray);
552unimplemented_slice_as_bytes!(FixedLenByteArray);
553
554impl AsBytes for bool {
555 fn as_bytes(&self) -> &[u8] {
556 unsafe { std::slice::from_raw_parts(self as *const bool as *const u8, 1) }
559 }
560}
561
562impl AsBytes for Int96 {
563 fn as_bytes(&self) -> &[u8] {
564 unsafe { std::slice::from_raw_parts(self.data() as *const [u32] as *const u8, 12) }
566 }
567}
568
569impl AsBytes for ByteArray {
570 fn as_bytes(&self) -> &[u8] {
571 self.data()
572 }
573}
574
575impl AsBytes for FixedLenByteArray {
576 fn as_bytes(&self) -> &[u8] {
577 self.data()
578 }
579}
580
581impl AsBytes for Decimal {
582 fn as_bytes(&self) -> &[u8] {
583 self.data()
584 }
585}
586
587impl AsBytes for Vec<u8> {
588 fn as_bytes(&self) -> &[u8] {
589 self.as_slice()
590 }
591}
592
593impl AsBytes for &str {
594 fn as_bytes(&self) -> &[u8] {
595 (self as &str).as_bytes()
596 }
597}
598
599impl AsBytes for str {
600 fn as_bytes(&self) -> &[u8] {
601 (self as &str).as_bytes()
602 }
603}
604
605pub(crate) mod private {
606 use bytes::Bytes;
607
608 use crate::encodings::decoding::PlainDecoderDetails;
609 use crate::util::bit_util::{read_num_bytes, BitReader, BitWriter};
610
611 use super::{ParquetError, Result, SliceAsBytes};
612 use crate::basic::Type;
613 use crate::file::metadata::HeapSize;
614
615 pub trait ParquetValueType:
621 PartialEq
622 + std::fmt::Debug
623 + std::fmt::Display
624 + Default
625 + Clone
626 + super::AsBytes
627 + super::FromBytes
628 + SliceAsBytes
629 + PartialOrd
630 + Send
631 + HeapSize
632 + crate::encodings::decoding::private::GetDecoder
633 + crate::file::statistics::private::MakeStatistics
634 {
635 const PHYSICAL_TYPE: Type;
636
637 fn encode<W: std::io::Write>(
639 values: &[Self],
640 writer: &mut W,
641 bit_writer: &mut BitWriter,
642 ) -> Result<()>;
643
644 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize);
646
647 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize>;
649
650 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize>;
651
652 fn dict_encoding_size(&self) -> (usize, usize) {
654 (std::mem::size_of::<Self>(), 1)
655 }
656
657 fn variable_length_bytes(_: &[Self]) -> Option<i64> {
661 None
662 }
663
664 fn as_i64(&self) -> Result<i64> {
669 Err(general_err!("Type cannot be converted to i64"))
670 }
671
672 fn as_u64(&self) -> Result<u64> {
677 self.as_i64()
678 .map_err(|_| general_err!("Type cannot be converted to u64"))
679 .map(|x| x as u64)
680 }
681
682 fn as_any(&self) -> &dyn std::any::Any;
684
685 fn as_mut_any(&mut self) -> &mut dyn std::any::Any;
687
688 fn set_from_bytes(&mut self, _data: Bytes) {
692 unimplemented!();
693 }
694 }
695
696 impl ParquetValueType for bool {
697 const PHYSICAL_TYPE: Type = Type::BOOLEAN;
698
699 #[inline]
700 fn encode<W: std::io::Write>(
701 values: &[Self],
702 _: &mut W,
703 bit_writer: &mut BitWriter,
704 ) -> Result<()> {
705 for value in values {
706 bit_writer.put_value(*value as u64, 1)
707 }
708 Ok(())
709 }
710
711 #[inline]
712 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
713 decoder.bit_reader.replace(BitReader::new(data));
714 decoder.num_values = num_values;
715 }
716
717 #[inline]
718 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
719 let bit_reader = decoder.bit_reader.as_mut().unwrap();
720 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
721 let values_read = bit_reader.get_batch(&mut buffer[..num_values], 1);
722 decoder.num_values -= values_read;
723 Ok(values_read)
724 }
725
726 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
727 let bit_reader = decoder.bit_reader.as_mut().unwrap();
728 let num_values = std::cmp::min(num_values, decoder.num_values);
729 let values_read = bit_reader.skip(num_values, 1);
730 decoder.num_values -= values_read;
731 Ok(values_read)
732 }
733
734 #[inline]
735 fn as_i64(&self) -> Result<i64> {
736 Ok(*self as i64)
737 }
738
739 #[inline]
740 fn as_any(&self) -> &dyn std::any::Any {
741 self
742 }
743
744 #[inline]
745 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
746 self
747 }
748 }
749
750 macro_rules! impl_from_raw {
751 ($ty: ty, $physical_ty: expr, $self: ident => $as_i64: block) => {
752 impl ParquetValueType for $ty {
753 const PHYSICAL_TYPE: Type = $physical_ty;
754
755 #[inline]
756 fn encode<W: std::io::Write>(values: &[Self], writer: &mut W, _: &mut BitWriter) -> Result<()> {
757 let raw = unsafe {
759 std::slice::from_raw_parts(
760 values.as_ptr() as *const u8,
761 std::mem::size_of_val(values),
762 )
763 };
764 writer.write_all(raw)?;
765
766 Ok(())
767 }
768
769 #[inline]
770 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
771 decoder.data.replace(data);
772 decoder.start = 0;
773 decoder.num_values = num_values;
774 }
775
776 #[inline]
777 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
778 let data = decoder.data.as_ref().expect("set_data should have been called");
779 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
780 let bytes_left = data.len() - decoder.start;
781 let bytes_to_decode = std::mem::size_of::<Self>() * num_values;
782
783 if bytes_left < bytes_to_decode {
784 return Err(eof_err!("Not enough bytes to decode"));
785 }
786
787 {
788 let raw_buffer = &mut unsafe { Self::slice_as_bytes_mut(buffer) }[..bytes_to_decode];
791 raw_buffer.copy_from_slice(data.slice(
792 decoder.start..decoder.start + bytes_to_decode
793 ).as_ref());
794 };
795 decoder.start += bytes_to_decode;
796 decoder.num_values -= num_values;
797
798 Ok(num_values)
799 }
800
801 #[inline]
802 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
803 let data = decoder.data.as_ref().expect("set_data should have been called");
804 let num_values = num_values.min(decoder.num_values);
805 let bytes_left = data.len() - decoder.start;
806 let bytes_to_skip = std::mem::size_of::<Self>() * num_values;
807
808 if bytes_left < bytes_to_skip {
809 return Err(eof_err!("Not enough bytes to skip"));
810 }
811
812 decoder.start += bytes_to_skip;
813 decoder.num_values -= num_values;
814
815 Ok(num_values)
816 }
817
818 #[inline]
819 fn as_i64(&$self) -> Result<i64> {
820 $as_i64
821 }
822
823 #[inline]
824 fn as_any(&self) -> &dyn std::any::Any {
825 self
826 }
827
828 #[inline]
829 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
830 self
831 }
832 }
833 }
834 }
835
836 impl_from_raw!(i32, Type::INT32, self => { Ok(*self as i64) });
837 impl_from_raw!(i64, Type::INT64, self => { Ok(*self) });
838 impl_from_raw!(f32, Type::FLOAT, self => { Err(general_err!("Type cannot be converted to i64")) });
839 impl_from_raw!(f64, Type::DOUBLE, self => { Err(general_err!("Type cannot be converted to i64")) });
840
841 impl ParquetValueType for super::Int96 {
842 const PHYSICAL_TYPE: Type = Type::INT96;
843
844 #[inline]
845 fn encode<W: std::io::Write>(
846 values: &[Self],
847 writer: &mut W,
848 _: &mut BitWriter,
849 ) -> Result<()> {
850 for value in values {
851 let raw = SliceAsBytes::slice_as_bytes(value.data());
852 writer.write_all(raw)?;
853 }
854 Ok(())
855 }
856
857 #[inline]
858 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
859 decoder.data.replace(data);
860 decoder.start = 0;
861 decoder.num_values = num_values;
862 }
863
864 #[inline]
865 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
866 let data = decoder
868 .data
869 .as_ref()
870 .expect("set_data should have been called");
871 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
872 let bytes_left = data.len() - decoder.start;
873 let bytes_to_decode = 12 * num_values;
874
875 if bytes_left < bytes_to_decode {
876 return Err(eof_err!("Not enough bytes to decode"));
877 }
878
879 let data_range = data.slice(decoder.start..decoder.start + bytes_to_decode);
880 let bytes: &[u8] = &data_range;
881 decoder.start += bytes_to_decode;
882
883 let mut pos = 0; for item in buffer.iter_mut().take(num_values) {
885 let elem0 = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap());
886 let elem1 = u32::from_le_bytes(bytes[pos + 4..pos + 8].try_into().unwrap());
887 let elem2 = u32::from_le_bytes(bytes[pos + 8..pos + 12].try_into().unwrap());
888
889 item.set_data(elem0, elem1, elem2);
890 pos += 12;
891 }
892 decoder.num_values -= num_values;
893
894 Ok(num_values)
895 }
896
897 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
898 let data = decoder
899 .data
900 .as_ref()
901 .expect("set_data should have been called");
902 let num_values = std::cmp::min(num_values, decoder.num_values);
903 let bytes_left = data.len() - decoder.start;
904 let bytes_to_skip = 12 * num_values;
905
906 if bytes_left < bytes_to_skip {
907 return Err(eof_err!("Not enough bytes to skip"));
908 }
909 decoder.start += bytes_to_skip;
910 decoder.num_values -= num_values;
911
912 Ok(num_values)
913 }
914
915 #[inline]
916 fn as_any(&self) -> &dyn std::any::Any {
917 self
918 }
919
920 #[inline]
921 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
922 self
923 }
924 }
925
926 impl HeapSize for super::Int96 {
927 fn heap_size(&self) -> usize {
928 0 }
930 }
931
932 impl ParquetValueType for super::ByteArray {
933 const PHYSICAL_TYPE: Type = Type::BYTE_ARRAY;
934
935 #[inline]
936 fn encode<W: std::io::Write>(
937 values: &[Self],
938 writer: &mut W,
939 _: &mut BitWriter,
940 ) -> Result<()> {
941 for value in values {
942 let len: u32 = value.len().try_into().unwrap();
943 writer.write_all(&len.to_ne_bytes())?;
944 let raw = value.data();
945 writer.write_all(raw)?;
946 }
947 Ok(())
948 }
949
950 #[inline]
951 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
952 decoder.data.replace(data);
953 decoder.start = 0;
954 decoder.num_values = num_values;
955 }
956
957 #[inline]
958 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
959 let data = decoder
960 .data
961 .as_mut()
962 .expect("set_data should have been called");
963 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
964 for val_array in buffer.iter_mut().take(num_values) {
965 let len: usize =
966 read_num_bytes::<u32>(4, data.slice(decoder.start..).as_ref()) as usize;
967 decoder.start += std::mem::size_of::<u32>();
968
969 if data.len() < decoder.start + len {
970 return Err(eof_err!("Not enough bytes to decode"));
971 }
972
973 val_array.set_data(data.slice(decoder.start..decoder.start + len));
974 decoder.start += len;
975 }
976 decoder.num_values -= num_values;
977
978 Ok(num_values)
979 }
980
981 fn variable_length_bytes(values: &[Self]) -> Option<i64> {
982 Some(values.iter().map(|x| x.len() as i64).sum())
983 }
984
985 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
986 let data = decoder
987 .data
988 .as_mut()
989 .expect("set_data should have been called");
990 let num_values = num_values.min(decoder.num_values);
991
992 for _ in 0..num_values {
993 let len: usize =
994 read_num_bytes::<u32>(4, data.slice(decoder.start..).as_ref()) as usize;
995 decoder.start += std::mem::size_of::<u32>() + len;
996 }
997 decoder.num_values -= num_values;
998
999 Ok(num_values)
1000 }
1001
1002 #[inline]
1003 fn dict_encoding_size(&self) -> (usize, usize) {
1004 (std::mem::size_of::<u32>(), self.len())
1005 }
1006
1007 #[inline]
1008 fn as_any(&self) -> &dyn std::any::Any {
1009 self
1010 }
1011
1012 #[inline]
1013 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
1014 self
1015 }
1016
1017 #[inline]
1018 fn set_from_bytes(&mut self, data: Bytes) {
1019 self.set_data(data);
1020 }
1021 }
1022
1023 impl HeapSize for super::ByteArray {
1024 fn heap_size(&self) -> usize {
1025 self.data.as_ref().map(|data| data.len()).unwrap_or(0)
1029 }
1030 }
1031
1032 impl ParquetValueType for super::FixedLenByteArray {
1033 const PHYSICAL_TYPE: Type = Type::FIXED_LEN_BYTE_ARRAY;
1034
1035 #[inline]
1036 fn encode<W: std::io::Write>(
1037 values: &[Self],
1038 writer: &mut W,
1039 _: &mut BitWriter,
1040 ) -> Result<()> {
1041 for value in values {
1042 let raw = value.data();
1043 writer.write_all(raw)?;
1044 }
1045 Ok(())
1046 }
1047
1048 #[inline]
1049 fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
1050 decoder.data.replace(data);
1051 decoder.start = 0;
1052 decoder.num_values = num_values;
1053 }
1054
1055 #[inline]
1056 fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
1057 assert!(decoder.type_length > 0);
1058
1059 let data = decoder
1060 .data
1061 .as_mut()
1062 .expect("set_data should have been called");
1063 let num_values = std::cmp::min(buffer.len(), decoder.num_values);
1064
1065 for item in buffer.iter_mut().take(num_values) {
1066 let len = decoder.type_length as usize;
1067
1068 if data.len() < decoder.start + len {
1069 return Err(eof_err!("Not enough bytes to decode"));
1070 }
1071
1072 item.set_data(data.slice(decoder.start..decoder.start + len));
1073 decoder.start += len;
1074 }
1075 decoder.num_values -= num_values;
1076
1077 Ok(num_values)
1078 }
1079
1080 fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
1081 assert!(decoder.type_length > 0);
1082
1083 let data = decoder
1084 .data
1085 .as_mut()
1086 .expect("set_data should have been called");
1087 let num_values = std::cmp::min(num_values, decoder.num_values);
1088 for _ in 0..num_values {
1089 let len = decoder.type_length as usize;
1090
1091 if data.len() < decoder.start + len {
1092 return Err(eof_err!("Not enough bytes to skip"));
1093 }
1094
1095 decoder.start += len;
1096 }
1097 decoder.num_values -= num_values;
1098
1099 Ok(num_values)
1100 }
1101
1102 #[inline]
1103 fn dict_encoding_size(&self) -> (usize, usize) {
1104 (std::mem::size_of::<u32>(), self.len())
1105 }
1106
1107 #[inline]
1108 fn as_any(&self) -> &dyn std::any::Any {
1109 self
1110 }
1111
1112 #[inline]
1113 fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
1114 self
1115 }
1116
1117 #[inline]
1118 fn set_from_bytes(&mut self, data: Bytes) {
1119 self.set_data(data);
1120 }
1121 }
1122
1123 impl HeapSize for super::FixedLenByteArray {
1124 fn heap_size(&self) -> usize {
1125 self.0.heap_size()
1126 }
1127 }
1128}
1129
1130pub trait DataType: 'static + Send {
1133 type T: private::ParquetValueType;
1135
1136 fn get_physical_type() -> Type {
1138 <Self::T as private::ParquetValueType>::PHYSICAL_TYPE
1139 }
1140
1141 fn get_type_size() -> usize;
1143
1144 fn get_column_reader(column_writer: ColumnReader) -> Option<ColumnReaderImpl<Self>>
1146 where
1147 Self: Sized;
1148
1149 fn get_column_writer(column_writer: ColumnWriter<'_>) -> Option<ColumnWriterImpl<'_, Self>>
1151 where
1152 Self: Sized;
1153
1154 fn get_column_writer_ref<'a, 'b: 'a>(
1156 column_writer: &'b ColumnWriter<'a>,
1157 ) -> Option<&'b ColumnWriterImpl<'a, Self>>
1158 where
1159 Self: Sized;
1160
1161 fn get_column_writer_mut<'a, 'b: 'a>(
1163 column_writer: &'a mut ColumnWriter<'b>,
1164 ) -> Option<&'a mut ColumnWriterImpl<'b, Self>>
1165 where
1166 Self: Sized;
1167}
1168
1169#[deprecated(
1171 since = "54.0.0",
1172 note = "Seems like a stray and nobody knows what's it for. Will be removed in 55.0.0"
1173)]
1174#[allow(missing_docs)]
1175pub trait SliceAsBytesDataType: DataType
1176where
1177 Self::T: SliceAsBytes,
1178{
1179}
1180
1181#[allow(deprecated)]
1182impl<T> SliceAsBytesDataType for T
1183where
1184 T: DataType,
1185 <T as DataType>::T: SliceAsBytes,
1186{
1187}
1188
1189macro_rules! make_type {
1190 ($name:ident, $reader_ident: ident, $writer_ident: ident, $native_ty:ty, $size:expr) => {
1191 #[doc = concat!("Parquet physical type: ", stringify!($name))]
1192 #[derive(Clone)]
1193 pub struct $name {}
1194
1195 impl DataType for $name {
1196 type T = $native_ty;
1197
1198 fn get_type_size() -> usize {
1199 $size
1200 }
1201
1202 fn get_column_reader(column_reader: ColumnReader) -> Option<ColumnReaderImpl<Self>> {
1203 match column_reader {
1204 ColumnReader::$reader_ident(w) => Some(w),
1205 _ => None,
1206 }
1207 }
1208
1209 fn get_column_writer(
1210 column_writer: ColumnWriter<'_>,
1211 ) -> Option<ColumnWriterImpl<'_, Self>> {
1212 match column_writer {
1213 ColumnWriter::$writer_ident(w) => Some(w),
1214 _ => None,
1215 }
1216 }
1217
1218 fn get_column_writer_ref<'a, 'b: 'a>(
1219 column_writer: &'a ColumnWriter<'b>,
1220 ) -> Option<&'a ColumnWriterImpl<'b, Self>> {
1221 match column_writer {
1222 ColumnWriter::$writer_ident(w) => Some(w),
1223 _ => None,
1224 }
1225 }
1226
1227 fn get_column_writer_mut<'a, 'b: 'a>(
1228 column_writer: &'a mut ColumnWriter<'b>,
1229 ) -> Option<&'a mut ColumnWriterImpl<'b, Self>> {
1230 match column_writer {
1231 ColumnWriter::$writer_ident(w) => Some(w),
1232 _ => None,
1233 }
1234 }
1235 }
1236 };
1237}
1238
1239make_type!(BoolType, BoolColumnReader, BoolColumnWriter, bool, 1);
1242make_type!(Int32Type, Int32ColumnReader, Int32ColumnWriter, i32, 4);
1243make_type!(Int64Type, Int64ColumnReader, Int64ColumnWriter, i64, 8);
1244make_type!(
1245 Int96Type,
1246 Int96ColumnReader,
1247 Int96ColumnWriter,
1248 Int96,
1249 mem::size_of::<Int96>()
1250);
1251make_type!(FloatType, FloatColumnReader, FloatColumnWriter, f32, 4);
1252make_type!(DoubleType, DoubleColumnReader, DoubleColumnWriter, f64, 8);
1253make_type!(
1254 ByteArrayType,
1255 ByteArrayColumnReader,
1256 ByteArrayColumnWriter,
1257 ByteArray,
1258 mem::size_of::<ByteArray>()
1259);
1260make_type!(
1261 FixedLenByteArrayType,
1262 FixedLenByteArrayColumnReader,
1263 FixedLenByteArrayColumnWriter,
1264 FixedLenByteArray,
1265 mem::size_of::<FixedLenByteArray>()
1266);
1267
1268impl AsRef<[u8]> for ByteArray {
1269 fn as_ref(&self) -> &[u8] {
1270 self.as_bytes()
1271 }
1272}
1273
1274impl AsRef<[u8]> for FixedLenByteArray {
1275 fn as_ref(&self) -> &[u8] {
1276 self.as_bytes()
1277 }
1278}
1279
1280macro_rules! ensure_phys_ty {
1282 ($($ty:pat_param)|+ , $err: literal) => {
1283 match T::get_physical_type() {
1284 $($ty => (),)*
1285 _ => panic!($err),
1286 };
1287 }
1288}
1289
1290#[cfg(test)]
1291mod tests {
1292 use super::*;
1293
1294 #[test]
1295 fn test_as_bytes() {
1296 let i96 = Int96::from(vec![1, 2, 3]);
1298 assert_eq!(i96.as_bytes(), &[1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0]);
1299
1300 let ba = ByteArray::from(vec![1, 2, 3]);
1302 assert_eq!(ba.as_bytes(), &[1, 2, 3]);
1303
1304 let decimal = Decimal::from_i32(123, 5, 2);
1306 assert_eq!(decimal.as_bytes(), &[0, 0, 0, 123]);
1307 let decimal = Decimal::from_i64(123, 5, 2);
1308 assert_eq!(decimal.as_bytes(), &[0, 0, 0, 0, 0, 0, 0, 123]);
1309 let decimal = Decimal::from_bytes(ByteArray::from(vec![1, 2, 3]), 5, 2);
1310 assert_eq!(decimal.as_bytes(), &[1, 2, 3]);
1311 }
1312
1313 #[test]
1314 fn test_int96_from() {
1315 assert_eq!(
1316 Int96::from(vec![1, 12345, 1234567890]).data(),
1317 &[1, 12345, 1234567890]
1318 );
1319 }
1320
1321 #[test]
1322 fn test_byte_array_from() {
1323 assert_eq!(ByteArray::from(b"ABC".to_vec()).data(), b"ABC");
1324 assert_eq!(ByteArray::from("ABC").data(), b"ABC");
1325 assert_eq!(
1326 ByteArray::from(Bytes::from(vec![1u8, 2u8, 3u8, 4u8, 5u8])).data(),
1327 &[1u8, 2u8, 3u8, 4u8, 5u8]
1328 );
1329 let buf = vec![6u8, 7u8, 8u8, 9u8, 10u8];
1330 assert_eq!(ByteArray::from(buf).data(), &[6u8, 7u8, 8u8, 9u8, 10u8]);
1331 }
1332
1333 #[test]
1334 fn test_decimal_partial_eq() {
1335 assert_eq!(Decimal::default(), Decimal::from_i32(0, 0, 0));
1336 assert_eq!(Decimal::from_i32(222, 5, 2), Decimal::from_i32(222, 5, 2));
1337 assert_eq!(
1338 Decimal::from_bytes(ByteArray::from(vec![0, 0, 0, 3]), 5, 2),
1339 Decimal::from_i32(3, 5, 2)
1340 );
1341
1342 assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(111, 5, 2));
1343 assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 6, 2));
1344 assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 5, 3));
1345
1346 assert!(Decimal::from_i64(222, 5, 2) != Decimal::from_i32(222, 5, 2));
1347 }
1348
1349 #[test]
1350 fn test_byte_array_ord() {
1351 let ba1 = ByteArray::from(vec![1, 2, 3]);
1352 let ba11 = ByteArray::from(vec![1, 2, 3]);
1353 let ba2 = ByteArray::from(vec![3, 4]);
1354 let ba3 = ByteArray::from(vec![1, 2, 4]);
1355 let ba4 = ByteArray::from(vec![]);
1356 let ba5 = ByteArray::from(vec![2, 2, 3]);
1357
1358 assert!(ba1 < ba2);
1359 assert!(ba3 > ba1);
1360 assert!(ba1 > ba4);
1361 assert_eq!(ba1, ba11);
1362 assert!(ba5 > ba1);
1363 }
1364}