use bytes::Bytes;
use half::f16;
use std::cmp::Ordering;
use std::fmt;
use std::mem;
use std::ops::{Deref, DerefMut};
use std::str::from_utf8;
use crate::basic::Type;
use crate::column::reader::{ColumnReader, ColumnReaderImpl};
use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
use crate::errors::{ParquetError, Result};
use crate::util::bit_util::FromBytes;
#[derive(Clone, Copy, Debug, PartialOrd, Default, PartialEq, Eq)]
pub struct Int96 {
value: [u32; 3],
}
impl Int96 {
pub fn new() -> Self {
Self { value: [0; 3] }
}
#[inline]
pub fn data(&self) -> &[u32] {
&self.value
}
#[inline]
pub fn set_data(&mut self, elem0: u32, elem1: u32, elem2: u32) {
self.value = [elem0, elem1, elem2];
}
pub fn to_i64(&self) -> i64 {
let (seconds, nanoseconds) = self.to_seconds_and_nanos();
seconds * 1_000 + nanoseconds / 1_000_000
}
pub fn to_nanos(&self) -> i64 {
let (seconds, nanoseconds) = self.to_seconds_and_nanos();
seconds
.wrapping_mul(1_000_000_000)
.wrapping_add(nanoseconds)
}
pub fn to_seconds_and_nanos(&self) -> (i64, i64) {
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
const SECONDS_PER_DAY: i64 = 86_400;
let day = self.data()[2] as i64;
let nanoseconds = ((self.data()[1] as i64) << 32) + self.data()[0] as i64;
let seconds = (day - JULIAN_DAY_OF_EPOCH) * SECONDS_PER_DAY;
(seconds, nanoseconds)
}
}
impl From<Vec<u32>> for Int96 {
fn from(buf: Vec<u32>) -> Self {
assert_eq!(buf.len(), 3);
let mut result = Self::new();
result.set_data(buf[0], buf[1], buf[2]);
result
}
}
impl fmt::Display for Int96 {
#[cold]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.data())
}
}
#[derive(Clone, Default)]
pub struct ByteArray {
data: Option<Bytes>,
}
impl std::fmt::Debug for ByteArray {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut debug_struct = f.debug_struct("ByteArray");
match self.as_utf8() {
Ok(s) => debug_struct.field("data", &s),
Err(_) => debug_struct.field("data", &self.data),
};
debug_struct.finish()
}
}
impl PartialOrd for ByteArray {
fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
match (&self.data, &other.data) {
(None, None) => Some(Ordering::Equal),
(None, Some(_)) => Some(Ordering::Less),
(Some(_), None) => Some(Ordering::Greater),
(Some(self_data), Some(other_data)) => {
self_data.partial_cmp(&other_data)
}
}
}
}
impl ByteArray {
#[inline]
pub fn new() -> Self {
ByteArray { data: None }
}
#[inline]
pub fn len(&self) -> usize {
assert!(self.data.is_some());
self.data.as_ref().unwrap().len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn data(&self) -> &[u8] {
self.data
.as_ref()
.expect("set_data should have been called")
.as_ref()
}
#[inline]
pub fn set_data(&mut self, data: Bytes) {
self.data = Some(data);
}
#[inline]
pub fn slice(&self, start: usize, len: usize) -> Self {
Self::from(
self.data
.as_ref()
.expect("set_data should have been called")
.slice(start..start + len),
)
}
pub fn as_utf8(&self) -> Result<&str> {
self.data
.as_ref()
.map(|ptr| ptr.as_ref())
.ok_or_else(|| general_err!("Can't convert empty byte array to utf8"))
.and_then(|bytes| from_utf8(bytes).map_err(|e| e.into()))
}
}
impl From<Vec<u8>> for ByteArray {
fn from(buf: Vec<u8>) -> ByteArray {
Self {
data: Some(buf.into()),
}
}
}
impl<'a> From<&'a [u8]> for ByteArray {
fn from(b: &'a [u8]) -> ByteArray {
let mut v = Vec::new();
v.extend_from_slice(b);
Self {
data: Some(v.into()),
}
}
}
impl<'a> From<&'a str> for ByteArray {
fn from(s: &'a str) -> ByteArray {
let mut v = Vec::new();
v.extend_from_slice(s.as_bytes());
Self {
data: Some(v.into()),
}
}
}
impl From<Bytes> for ByteArray {
fn from(value: Bytes) -> Self {
Self { data: Some(value) }
}
}
impl From<f16> for ByteArray {
fn from(value: f16) -> Self {
Self::from(value.to_le_bytes().as_slice())
}
}
impl PartialEq for ByteArray {
fn eq(&self, other: &ByteArray) -> bool {
match (&self.data, &other.data) {
(Some(d1), Some(d2)) => d1.as_ref() == d2.as_ref(),
(None, None) => true,
_ => false,
}
}
}
impl fmt::Display for ByteArray {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", self.data())
}
}
#[repr(transparent)]
#[derive(Clone, Debug, Default)]
pub struct FixedLenByteArray(ByteArray);
impl PartialEq for FixedLenByteArray {
fn eq(&self, other: &FixedLenByteArray) -> bool {
self.0.eq(&other.0)
}
}
impl PartialEq<ByteArray> for FixedLenByteArray {
fn eq(&self, other: &ByteArray) -> bool {
self.0.eq(other)
}
}
impl PartialEq<FixedLenByteArray> for ByteArray {
fn eq(&self, other: &FixedLenByteArray) -> bool {
self.eq(&other.0)
}
}
impl fmt::Display for FixedLenByteArray {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(f)
}
}
impl PartialOrd for FixedLenByteArray {
fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> {
self.0.partial_cmp(&other.0)
}
}
impl PartialOrd<FixedLenByteArray> for ByteArray {
fn partial_cmp(&self, other: &FixedLenByteArray) -> Option<Ordering> {
self.partial_cmp(&other.0)
}
}
impl PartialOrd<ByteArray> for FixedLenByteArray {
fn partial_cmp(&self, other: &ByteArray) -> Option<Ordering> {
self.0.partial_cmp(other)
}
}
impl Deref for FixedLenByteArray {
type Target = ByteArray;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for FixedLenByteArray {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl From<ByteArray> for FixedLenByteArray {
fn from(other: ByteArray) -> Self {
Self(other)
}
}
impl From<Vec<u8>> for FixedLenByteArray {
fn from(buf: Vec<u8>) -> FixedLenByteArray {
FixedLenByteArray(ByteArray::from(buf))
}
}
impl From<FixedLenByteArray> for ByteArray {
fn from(other: FixedLenByteArray) -> Self {
other.0
}
}
#[derive(Clone, Debug)]
pub enum Decimal {
Int32 {
value: [u8; 4],
precision: i32,
scale: i32,
},
Int64 {
value: [u8; 8],
precision: i32,
scale: i32,
},
Bytes {
value: ByteArray,
precision: i32,
scale: i32,
},
}
impl Decimal {
pub fn from_i32(value: i32, precision: i32, scale: i32) -> Self {
let bytes = value.to_be_bytes();
Decimal::Int32 {
value: bytes,
precision,
scale,
}
}
pub fn from_i64(value: i64, precision: i32, scale: i32) -> Self {
let bytes = value.to_be_bytes();
Decimal::Int64 {
value: bytes,
precision,
scale,
}
}
pub fn from_bytes(value: ByteArray, precision: i32, scale: i32) -> Self {
Decimal::Bytes {
value,
precision,
scale,
}
}
pub fn data(&self) -> &[u8] {
match *self {
Decimal::Int32 { ref value, .. } => value,
Decimal::Int64 { ref value, .. } => value,
Decimal::Bytes { ref value, .. } => value.data(),
}
}
pub fn precision(&self) -> i32 {
match *self {
Decimal::Int32 { precision, .. } => precision,
Decimal::Int64 { precision, .. } => precision,
Decimal::Bytes { precision, .. } => precision,
}
}
pub fn scale(&self) -> i32 {
match *self {
Decimal::Int32 { scale, .. } => scale,
Decimal::Int64 { scale, .. } => scale,
Decimal::Bytes { scale, .. } => scale,
}
}
}
impl Default for Decimal {
fn default() -> Self {
Self::from_i32(0, 0, 0)
}
}
impl PartialEq for Decimal {
fn eq(&self, other: &Decimal) -> bool {
self.precision() == other.precision()
&& self.scale() == other.scale()
&& self.data() == other.data()
}
}
pub trait AsBytes {
fn as_bytes(&self) -> &[u8];
}
pub trait SliceAsBytes: Sized {
fn slice_as_bytes(self_: &[Self]) -> &[u8];
unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8];
}
impl AsBytes for [u8] {
fn as_bytes(&self) -> &[u8] {
self
}
}
macro_rules! gen_as_bytes {
($source_ty:ident) => {
impl AsBytes for $source_ty {
#[allow(clippy::size_of_in_element_count)]
fn as_bytes(&self) -> &[u8] {
unsafe {
std::slice::from_raw_parts(
self as *const $source_ty as *const u8,
std::mem::size_of::<$source_ty>(),
)
}
}
}
impl SliceAsBytes for $source_ty {
#[inline]
#[allow(clippy::size_of_in_element_count)]
fn slice_as_bytes(self_: &[Self]) -> &[u8] {
unsafe {
std::slice::from_raw_parts(
self_.as_ptr() as *const u8,
std::mem::size_of_val(self_),
)
}
}
#[inline]
#[allow(clippy::size_of_in_element_count)]
unsafe fn slice_as_bytes_mut(self_: &mut [Self]) -> &mut [u8] {
unsafe {
std::slice::from_raw_parts_mut(
self_.as_mut_ptr() as *mut u8,
std::mem::size_of_val(self_),
)
}
}
}
};
}
gen_as_bytes!(i8);
gen_as_bytes!(i16);
gen_as_bytes!(i32);
gen_as_bytes!(i64);
gen_as_bytes!(u8);
gen_as_bytes!(u16);
gen_as_bytes!(u32);
gen_as_bytes!(u64);
gen_as_bytes!(f32);
gen_as_bytes!(f64);
macro_rules! unimplemented_slice_as_bytes {
($ty: ty) => {
impl SliceAsBytes for $ty {
fn slice_as_bytes(_self: &[Self]) -> &[u8] {
unimplemented!()
}
unsafe fn slice_as_bytes_mut(_self: &mut [Self]) -> &mut [u8] {
unimplemented!()
}
}
};
}
unimplemented_slice_as_bytes!(Int96);
unimplemented_slice_as_bytes!(bool);
unimplemented_slice_as_bytes!(ByteArray);
unimplemented_slice_as_bytes!(FixedLenByteArray);
impl AsBytes for bool {
fn as_bytes(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self as *const bool as *const u8, 1) }
}
}
impl AsBytes for Int96 {
fn as_bytes(&self) -> &[u8] {
unsafe { std::slice::from_raw_parts(self.data() as *const [u32] as *const u8, 12) }
}
}
impl AsBytes for ByteArray {
fn as_bytes(&self) -> &[u8] {
self.data()
}
}
impl AsBytes for FixedLenByteArray {
fn as_bytes(&self) -> &[u8] {
self.data()
}
}
impl AsBytes for Decimal {
fn as_bytes(&self) -> &[u8] {
self.data()
}
}
impl AsBytes for Vec<u8> {
fn as_bytes(&self) -> &[u8] {
self.as_slice()
}
}
impl AsBytes for &str {
fn as_bytes(&self) -> &[u8] {
(self as &str).as_bytes()
}
}
impl AsBytes for str {
fn as_bytes(&self) -> &[u8] {
(self as &str).as_bytes()
}
}
pub(crate) mod private {
use bytes::Bytes;
use crate::encodings::decoding::PlainDecoderDetails;
use crate::util::bit_util::{read_num_bytes, BitReader, BitWriter};
use super::{ParquetError, Result, SliceAsBytes};
use crate::basic::Type;
use crate::file::metadata::HeapSize;
pub trait ParquetValueType:
PartialEq
+ std::fmt::Debug
+ std::fmt::Display
+ Default
+ Clone
+ super::AsBytes
+ super::FromBytes
+ SliceAsBytes
+ PartialOrd
+ Send
+ HeapSize
+ crate::encodings::decoding::private::GetDecoder
+ crate::file::statistics::private::MakeStatistics
{
const PHYSICAL_TYPE: Type;
fn encode<W: std::io::Write>(
values: &[Self],
writer: &mut W,
bit_writer: &mut BitWriter,
) -> Result<()>;
fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize);
fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize>;
fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize>;
fn dict_encoding_size(&self) -> (usize, usize) {
(std::mem::size_of::<Self>(), 1)
}
fn variable_length_bytes(_: &[Self]) -> Option<i64> {
None
}
fn as_i64(&self) -> Result<i64> {
Err(general_err!("Type cannot be converted to i64"))
}
fn as_u64(&self) -> Result<u64> {
self.as_i64()
.map_err(|_| general_err!("Type cannot be converted to u64"))
.map(|x| x as u64)
}
fn as_any(&self) -> &dyn std::any::Any;
fn as_mut_any(&mut self) -> &mut dyn std::any::Any;
fn set_from_bytes(&mut self, _data: Bytes) {
unimplemented!();
}
}
impl ParquetValueType for bool {
const PHYSICAL_TYPE: Type = Type::BOOLEAN;
#[inline]
fn encode<W: std::io::Write>(
values: &[Self],
_: &mut W,
bit_writer: &mut BitWriter,
) -> Result<()> {
for value in values {
bit_writer.put_value(*value as u64, 1)
}
Ok(())
}
#[inline]
fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
decoder.bit_reader.replace(BitReader::new(data));
decoder.num_values = num_values;
}
#[inline]
fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
let bit_reader = decoder.bit_reader.as_mut().unwrap();
let num_values = std::cmp::min(buffer.len(), decoder.num_values);
let values_read = bit_reader.get_batch(&mut buffer[..num_values], 1);
decoder.num_values -= values_read;
Ok(values_read)
}
fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
let bit_reader = decoder.bit_reader.as_mut().unwrap();
let num_values = std::cmp::min(num_values, decoder.num_values);
let values_read = bit_reader.skip(num_values, 1);
decoder.num_values -= values_read;
Ok(values_read)
}
#[inline]
fn as_i64(&self) -> Result<i64> {
Ok(*self as i64)
}
#[inline]
fn as_any(&self) -> &dyn std::any::Any {
self
}
#[inline]
fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}
}
macro_rules! impl_from_raw {
($ty: ty, $physical_ty: expr, $self: ident => $as_i64: block) => {
impl ParquetValueType for $ty {
const PHYSICAL_TYPE: Type = $physical_ty;
#[inline]
fn encode<W: std::io::Write>(values: &[Self], writer: &mut W, _: &mut BitWriter) -> Result<()> {
let raw = unsafe {
std::slice::from_raw_parts(
values.as_ptr() as *const u8,
std::mem::size_of_val(values),
)
};
writer.write_all(raw)?;
Ok(())
}
#[inline]
fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
decoder.data.replace(data);
decoder.start = 0;
decoder.num_values = num_values;
}
#[inline]
fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
let data = decoder.data.as_ref().expect("set_data should have been called");
let num_values = std::cmp::min(buffer.len(), decoder.num_values);
let bytes_left = data.len() - decoder.start;
let bytes_to_decode = std::mem::size_of::<Self>() * num_values;
if bytes_left < bytes_to_decode {
return Err(eof_err!("Not enough bytes to decode"));
}
{
let raw_buffer = &mut unsafe { Self::slice_as_bytes_mut(buffer) }[..bytes_to_decode];
raw_buffer.copy_from_slice(data.slice(
decoder.start..decoder.start + bytes_to_decode
).as_ref());
};
decoder.start += bytes_to_decode;
decoder.num_values -= num_values;
Ok(num_values)
}
#[inline]
fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
let data = decoder.data.as_ref().expect("set_data should have been called");
let num_values = num_values.min(decoder.num_values);
let bytes_left = data.len() - decoder.start;
let bytes_to_skip = std::mem::size_of::<Self>() * num_values;
if bytes_left < bytes_to_skip {
return Err(eof_err!("Not enough bytes to skip"));
}
decoder.start += bytes_to_skip;
decoder.num_values -= num_values;
Ok(num_values)
}
#[inline]
fn as_i64(&$self) -> Result<i64> {
$as_i64
}
#[inline]
fn as_any(&self) -> &dyn std::any::Any {
self
}
#[inline]
fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}
}
}
}
impl_from_raw!(i32, Type::INT32, self => { Ok(*self as i64) });
impl_from_raw!(i64, Type::INT64, self => { Ok(*self) });
impl_from_raw!(f32, Type::FLOAT, self => { Err(general_err!("Type cannot be converted to i64")) });
impl_from_raw!(f64, Type::DOUBLE, self => { Err(general_err!("Type cannot be converted to i64")) });
impl ParquetValueType for super::Int96 {
const PHYSICAL_TYPE: Type = Type::INT96;
#[inline]
fn encode<W: std::io::Write>(
values: &[Self],
writer: &mut W,
_: &mut BitWriter,
) -> Result<()> {
for value in values {
let raw = SliceAsBytes::slice_as_bytes(value.data());
writer.write_all(raw)?;
}
Ok(())
}
#[inline]
fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
decoder.data.replace(data);
decoder.start = 0;
decoder.num_values = num_values;
}
#[inline]
fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
let data = decoder
.data
.as_ref()
.expect("set_data should have been called");
let num_values = std::cmp::min(buffer.len(), decoder.num_values);
let bytes_left = data.len() - decoder.start;
let bytes_to_decode = 12 * num_values;
if bytes_left < bytes_to_decode {
return Err(eof_err!("Not enough bytes to decode"));
}
let data_range = data.slice(decoder.start..decoder.start + bytes_to_decode);
let bytes: &[u8] = &data_range;
decoder.start += bytes_to_decode;
let mut pos = 0; for item in buffer.iter_mut().take(num_values) {
let elem0 = u32::from_le_bytes(bytes[pos..pos + 4].try_into().unwrap());
let elem1 = u32::from_le_bytes(bytes[pos + 4..pos + 8].try_into().unwrap());
let elem2 = u32::from_le_bytes(bytes[pos + 8..pos + 12].try_into().unwrap());
item.set_data(elem0, elem1, elem2);
pos += 12;
}
decoder.num_values -= num_values;
Ok(num_values)
}
fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
let data = decoder
.data
.as_ref()
.expect("set_data should have been called");
let num_values = std::cmp::min(num_values, decoder.num_values);
let bytes_left = data.len() - decoder.start;
let bytes_to_skip = 12 * num_values;
if bytes_left < bytes_to_skip {
return Err(eof_err!("Not enough bytes to skip"));
}
decoder.start += bytes_to_skip;
decoder.num_values -= num_values;
Ok(num_values)
}
#[inline]
fn as_any(&self) -> &dyn std::any::Any {
self
}
#[inline]
fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}
}
impl HeapSize for super::Int96 {
fn heap_size(&self) -> usize {
0 }
}
impl ParquetValueType for super::ByteArray {
const PHYSICAL_TYPE: Type = Type::BYTE_ARRAY;
#[inline]
fn encode<W: std::io::Write>(
values: &[Self],
writer: &mut W,
_: &mut BitWriter,
) -> Result<()> {
for value in values {
let len: u32 = value.len().try_into().unwrap();
writer.write_all(&len.to_ne_bytes())?;
let raw = value.data();
writer.write_all(raw)?;
}
Ok(())
}
#[inline]
fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
decoder.data.replace(data);
decoder.start = 0;
decoder.num_values = num_values;
}
#[inline]
fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
let data = decoder
.data
.as_mut()
.expect("set_data should have been called");
let num_values = std::cmp::min(buffer.len(), decoder.num_values);
for val_array in buffer.iter_mut().take(num_values) {
let len: usize =
read_num_bytes::<u32>(4, data.slice(decoder.start..).as_ref()) as usize;
decoder.start += std::mem::size_of::<u32>();
if data.len() < decoder.start + len {
return Err(eof_err!("Not enough bytes to decode"));
}
val_array.set_data(data.slice(decoder.start..decoder.start + len));
decoder.start += len;
}
decoder.num_values -= num_values;
Ok(num_values)
}
fn variable_length_bytes(values: &[Self]) -> Option<i64> {
Some(values.iter().map(|x| x.len() as i64).sum())
}
fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
let data = decoder
.data
.as_mut()
.expect("set_data should have been called");
let num_values = num_values.min(decoder.num_values);
for _ in 0..num_values {
let len: usize =
read_num_bytes::<u32>(4, data.slice(decoder.start..).as_ref()) as usize;
decoder.start += std::mem::size_of::<u32>() + len;
}
decoder.num_values -= num_values;
Ok(num_values)
}
#[inline]
fn dict_encoding_size(&self) -> (usize, usize) {
(std::mem::size_of::<u32>(), self.len())
}
#[inline]
fn as_any(&self) -> &dyn std::any::Any {
self
}
#[inline]
fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}
#[inline]
fn set_from_bytes(&mut self, data: Bytes) {
self.set_data(data);
}
}
impl HeapSize for super::ByteArray {
fn heap_size(&self) -> usize {
self.data.as_ref().map(|data| data.len()).unwrap_or(0)
}
}
impl ParquetValueType for super::FixedLenByteArray {
const PHYSICAL_TYPE: Type = Type::FIXED_LEN_BYTE_ARRAY;
#[inline]
fn encode<W: std::io::Write>(
values: &[Self],
writer: &mut W,
_: &mut BitWriter,
) -> Result<()> {
for value in values {
let raw = value.data();
writer.write_all(raw)?;
}
Ok(())
}
#[inline]
fn set_data(decoder: &mut PlainDecoderDetails, data: Bytes, num_values: usize) {
decoder.data.replace(data);
decoder.start = 0;
decoder.num_values = num_values;
}
#[inline]
fn decode(buffer: &mut [Self], decoder: &mut PlainDecoderDetails) -> Result<usize> {
assert!(decoder.type_length > 0);
let data = decoder
.data
.as_mut()
.expect("set_data should have been called");
let num_values = std::cmp::min(buffer.len(), decoder.num_values);
for item in buffer.iter_mut().take(num_values) {
let len = decoder.type_length as usize;
if data.len() < decoder.start + len {
return Err(eof_err!("Not enough bytes to decode"));
}
item.set_data(data.slice(decoder.start..decoder.start + len));
decoder.start += len;
}
decoder.num_values -= num_values;
Ok(num_values)
}
fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
assert!(decoder.type_length > 0);
let data = decoder
.data
.as_mut()
.expect("set_data should have been called");
let num_values = std::cmp::min(num_values, decoder.num_values);
for _ in 0..num_values {
let len = decoder.type_length as usize;
if data.len() < decoder.start + len {
return Err(eof_err!("Not enough bytes to skip"));
}
decoder.start += len;
}
decoder.num_values -= num_values;
Ok(num_values)
}
#[inline]
fn dict_encoding_size(&self) -> (usize, usize) {
(std::mem::size_of::<u32>(), self.len())
}
#[inline]
fn as_any(&self) -> &dyn std::any::Any {
self
}
#[inline]
fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}
#[inline]
fn set_from_bytes(&mut self, data: Bytes) {
self.set_data(data);
}
}
impl HeapSize for super::FixedLenByteArray {
fn heap_size(&self) -> usize {
self.0.heap_size()
}
}
}
pub trait DataType: 'static + Send {
type T: private::ParquetValueType;
fn get_physical_type() -> Type {
<Self::T as private::ParquetValueType>::PHYSICAL_TYPE
}
fn get_type_size() -> usize;
fn get_column_reader(column_writer: ColumnReader) -> Option<ColumnReaderImpl<Self>>
where
Self: Sized;
fn get_column_writer(column_writer: ColumnWriter<'_>) -> Option<ColumnWriterImpl<'_, Self>>
where
Self: Sized;
fn get_column_writer_ref<'a, 'b: 'a>(
column_writer: &'b ColumnWriter<'a>,
) -> Option<&'b ColumnWriterImpl<'a, Self>>
where
Self: Sized;
fn get_column_writer_mut<'a, 'b: 'a>(
column_writer: &'a mut ColumnWriter<'b>,
) -> Option<&'a mut ColumnWriterImpl<'b, Self>>
where
Self: Sized;
}
#[deprecated(
since = "54.0.0",
note = "Seems like a stray and nobody knows what's it for. Will be removed in 55.0.0"
)]
#[allow(missing_docs)]
pub trait SliceAsBytesDataType: DataType
where
Self::T: SliceAsBytes,
{
}
#[allow(deprecated)]
impl<T> SliceAsBytesDataType for T
where
T: DataType,
<T as DataType>::T: SliceAsBytes,
{
}
macro_rules! make_type {
($name:ident, $reader_ident: ident, $writer_ident: ident, $native_ty:ty, $size:expr) => {
#[doc = concat!("Parquet physical type: ", stringify!($name))]
#[derive(Clone)]
pub struct $name {}
impl DataType for $name {
type T = $native_ty;
fn get_type_size() -> usize {
$size
}
fn get_column_reader(column_reader: ColumnReader) -> Option<ColumnReaderImpl<Self>> {
match column_reader {
ColumnReader::$reader_ident(w) => Some(w),
_ => None,
}
}
fn get_column_writer(
column_writer: ColumnWriter<'_>,
) -> Option<ColumnWriterImpl<'_, Self>> {
match column_writer {
ColumnWriter::$writer_ident(w) => Some(w),
_ => None,
}
}
fn get_column_writer_ref<'a, 'b: 'a>(
column_writer: &'a ColumnWriter<'b>,
) -> Option<&'a ColumnWriterImpl<'b, Self>> {
match column_writer {
ColumnWriter::$writer_ident(w) => Some(w),
_ => None,
}
}
fn get_column_writer_mut<'a, 'b: 'a>(
column_writer: &'a mut ColumnWriter<'b>,
) -> Option<&'a mut ColumnWriterImpl<'b, Self>> {
match column_writer {
ColumnWriter::$writer_ident(w) => Some(w),
_ => None,
}
}
}
};
}
make_type!(BoolType, BoolColumnReader, BoolColumnWriter, bool, 1);
make_type!(Int32Type, Int32ColumnReader, Int32ColumnWriter, i32, 4);
make_type!(Int64Type, Int64ColumnReader, Int64ColumnWriter, i64, 8);
make_type!(
Int96Type,
Int96ColumnReader,
Int96ColumnWriter,
Int96,
mem::size_of::<Int96>()
);
make_type!(FloatType, FloatColumnReader, FloatColumnWriter, f32, 4);
make_type!(DoubleType, DoubleColumnReader, DoubleColumnWriter, f64, 8);
make_type!(
ByteArrayType,
ByteArrayColumnReader,
ByteArrayColumnWriter,
ByteArray,
mem::size_of::<ByteArray>()
);
make_type!(
FixedLenByteArrayType,
FixedLenByteArrayColumnReader,
FixedLenByteArrayColumnWriter,
FixedLenByteArray,
mem::size_of::<FixedLenByteArray>()
);
impl AsRef<[u8]> for ByteArray {
fn as_ref(&self) -> &[u8] {
self.as_bytes()
}
}
impl AsRef<[u8]> for FixedLenByteArray {
fn as_ref(&self) -> &[u8] {
self.as_bytes()
}
}
macro_rules! ensure_phys_ty {
($($ty:pat_param)|+ , $err: literal) => {
match T::get_physical_type() {
$($ty => (),)*
_ => panic!($err),
};
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_as_bytes() {
let i96 = Int96::from(vec![1, 2, 3]);
assert_eq!(i96.as_bytes(), &[1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0]);
let ba = ByteArray::from(vec![1, 2, 3]);
assert_eq!(ba.as_bytes(), &[1, 2, 3]);
let decimal = Decimal::from_i32(123, 5, 2);
assert_eq!(decimal.as_bytes(), &[0, 0, 0, 123]);
let decimal = Decimal::from_i64(123, 5, 2);
assert_eq!(decimal.as_bytes(), &[0, 0, 0, 0, 0, 0, 0, 123]);
let decimal = Decimal::from_bytes(ByteArray::from(vec![1, 2, 3]), 5, 2);
assert_eq!(decimal.as_bytes(), &[1, 2, 3]);
}
#[test]
fn test_int96_from() {
assert_eq!(
Int96::from(vec![1, 12345, 1234567890]).data(),
&[1, 12345, 1234567890]
);
}
#[test]
fn test_byte_array_from() {
assert_eq!(ByteArray::from(b"ABC".to_vec()).data(), b"ABC");
assert_eq!(ByteArray::from("ABC").data(), b"ABC");
assert_eq!(
ByteArray::from(Bytes::from(vec![1u8, 2u8, 3u8, 4u8, 5u8])).data(),
&[1u8, 2u8, 3u8, 4u8, 5u8]
);
let buf = vec![6u8, 7u8, 8u8, 9u8, 10u8];
assert_eq!(ByteArray::from(buf).data(), &[6u8, 7u8, 8u8, 9u8, 10u8]);
}
#[test]
fn test_decimal_partial_eq() {
assert_eq!(Decimal::default(), Decimal::from_i32(0, 0, 0));
assert_eq!(Decimal::from_i32(222, 5, 2), Decimal::from_i32(222, 5, 2));
assert_eq!(
Decimal::from_bytes(ByteArray::from(vec![0, 0, 0, 3]), 5, 2),
Decimal::from_i32(3, 5, 2)
);
assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(111, 5, 2));
assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 6, 2));
assert!(Decimal::from_i32(222, 5, 2) != Decimal::from_i32(222, 5, 3));
assert!(Decimal::from_i64(222, 5, 2) != Decimal::from_i32(222, 5, 2));
}
#[test]
fn test_byte_array_ord() {
let ba1 = ByteArray::from(vec![1, 2, 3]);
let ba11 = ByteArray::from(vec![1, 2, 3]);
let ba2 = ByteArray::from(vec![3, 4]);
let ba3 = ByteArray::from(vec![1, 2, 4]);
let ba4 = ByteArray::from(vec![]);
let ba5 = ByteArray::from(vec![2, 2, 3]);
assert!(ba1 < ba2);
assert!(ba3 > ba1);
assert!(ba1 > ba4);
assert_eq!(ba1, ba11);
assert!(ba5 > ba1);
}
}