use bytes::Bytes;
use half::f16;
use crate::basic::{ConvertedType, Encoding, LogicalType, Type};
use crate::bloom_filter::Sbbf;
use crate::column::writer::{
compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min,
};
use crate::data_type::private::ParquetValueType;
use crate::data_type::DataType;
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
use crate::errors::{ParquetError, Result};
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
pub trait ColumnValues {
fn len(&self) -> usize;
}
#[cfg(feature = "arrow")]
impl ColumnValues for dyn arrow_array::Array {
fn len(&self) -> usize {
arrow_array::Array::len(self)
}
}
impl<T: ParquetValueType> ColumnValues for [T] {
fn len(&self) -> usize {
self.len()
}
}
pub struct DictionaryPage {
pub buf: Bytes,
pub num_values: usize,
pub is_sorted: bool,
}
pub struct DataPageValues<T> {
pub buf: Bytes,
pub num_values: usize,
pub encoding: Encoding,
pub min_value: Option<T>,
pub max_value: Option<T>,
pub variable_length_bytes: Option<i64>,
}
pub trait ColumnValueEncoder {
type T: ParquetValueType;
type Values: ColumnValues + ?Sized;
fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
where
Self: Sized;
fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()>;
fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()>;
fn num_values(&self) -> usize;
fn has_dictionary(&self) -> bool;
fn estimated_memory_size(&self) -> usize;
fn estimated_dict_page_size(&self) -> Option<usize>;
fn estimated_data_page_size(&self) -> usize;
fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>>;
fn flush_data_page(&mut self) -> Result<DataPageValues<Self::T>>;
fn flush_bloom_filter(&mut self) -> Option<Sbbf>;
}
pub struct ColumnValueEncoderImpl<T: DataType> {
encoder: Box<dyn Encoder<T>>,
dict_encoder: Option<DictEncoder<T>>,
descr: ColumnDescPtr,
num_values: usize,
statistics_enabled: EnabledStatistics,
min_value: Option<T::T>,
max_value: Option<T::T>,
bloom_filter: Option<Sbbf>,
variable_length_bytes: Option<i64>,
}
impl<T: DataType> ColumnValueEncoderImpl<T> {
fn min_max(&self, values: &[T::T], value_indices: Option<&[usize]>) -> Option<(T::T, T::T)> {
match value_indices {
Some(indices) => get_min_max(&self.descr, indices.iter().map(|x| &values[*x])),
None => get_min_max(&self.descr, values.iter()),
}
}
fn write_slice(&mut self, slice: &[T::T]) -> Result<()> {
if self.statistics_enabled != EnabledStatistics::None
&& self.descr.converted_type() != ConvertedType::INTERVAL
{
if let Some((min, max)) = self.min_max(slice, None) {
update_min(&self.descr, &min, &mut self.min_value);
update_max(&self.descr, &max, &mut self.max_value);
}
if let Some(var_bytes) = T::T::variable_length_bytes(slice) {
*self.variable_length_bytes.get_or_insert(0) += var_bytes;
}
}
if let Some(bloom_filter) = &mut self.bloom_filter {
for value in slice {
bloom_filter.insert(value);
}
}
match &mut self.dict_encoder {
Some(encoder) => encoder.put(slice),
_ => self.encoder.put(slice),
}
}
}
impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
type T = T::T;
type Values = [T::T];
fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
self.bloom_filter.take()
}
fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self> {
let dict_supported = props.dictionary_enabled(descr.path())
&& has_dictionary_support(T::get_physical_type(), props);
let dict_encoder = dict_supported.then(|| DictEncoder::new(descr.clone()));
let encoder = get_encoder(
props
.encoding(descr.path())
.unwrap_or_else(|| fallback_encoding(T::get_physical_type(), props)),
descr,
)?;
let statistics_enabled = props.statistics_enabled(descr.path());
let bloom_filter = props
.bloom_filter_properties(descr.path())
.map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
.transpose()?;
Ok(Self {
encoder,
dict_encoder,
descr: descr.clone(),
num_values: 0,
statistics_enabled,
bloom_filter,
min_value: None,
max_value: None,
variable_length_bytes: None,
})
}
fn write(&mut self, values: &[T::T], offset: usize, len: usize) -> Result<()> {
self.num_values += len;
let slice = values.get(offset..offset + len).ok_or_else(|| {
general_err!(
"Expected to write {} values, but have only {}",
len,
values.len() - offset
)
})?;
self.write_slice(slice)
}
fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
self.num_values += indices.len();
let slice: Vec<_> = indices.iter().map(|idx| values[*idx].clone()).collect();
self.write_slice(&slice)
}
fn num_values(&self) -> usize {
self.num_values
}
fn has_dictionary(&self) -> bool {
self.dict_encoder.is_some()
}
fn estimated_memory_size(&self) -> usize {
let encoder_size = self.encoder.estimated_memory_size();
let dict_encoder_size = self
.dict_encoder
.as_ref()
.map(|encoder| encoder.estimated_memory_size())
.unwrap_or_default();
let bloom_filter_size = self
.bloom_filter
.as_ref()
.map(|bf| bf.estimated_memory_size())
.unwrap_or_default();
encoder_size + dict_encoder_size + bloom_filter_size
}
fn estimated_dict_page_size(&self) -> Option<usize> {
Some(self.dict_encoder.as_ref()?.dict_encoded_size())
}
fn estimated_data_page_size(&self) -> usize {
match &self.dict_encoder {
Some(encoder) => encoder.estimated_data_encoded_size(),
_ => self.encoder.estimated_data_encoded_size(),
}
}
fn flush_dict_page(&mut self) -> Result<Option<DictionaryPage>> {
match self.dict_encoder.take() {
Some(encoder) => {
if self.num_values != 0 {
return Err(general_err!(
"Must flush data pages before flushing dictionary"
));
}
let buf = encoder.write_dict()?;
Ok(Some(DictionaryPage {
buf,
num_values: encoder.num_entries(),
is_sorted: encoder.is_sorted(),
}))
}
_ => Ok(None),
}
}
fn flush_data_page(&mut self) -> Result<DataPageValues<T::T>> {
let (buf, encoding) = match &mut self.dict_encoder {
Some(encoder) => (encoder.write_indices()?, Encoding::RLE_DICTIONARY),
_ => (self.encoder.flush_buffer()?, self.encoder.encoding()),
};
Ok(DataPageValues {
buf,
encoding,
num_values: std::mem::take(&mut self.num_values),
min_value: self.min_value.take(),
max_value: self.max_value.take(),
variable_length_bytes: self.variable_length_bytes.take(),
})
}
}
fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(T, T)>
where
T: ParquetValueType + 'a,
I: Iterator<Item = &'a T>,
{
let first = loop {
let next = iter.next()?;
if !is_nan(descr, next) {
break next;
}
};
let mut min = first;
let mut max = first;
for val in iter {
if is_nan(descr, val) {
continue;
}
if compare_greater(descr, min, val) {
min = val;
}
if compare_greater(descr, val, max) {
max = val;
}
}
let min = replace_zero(min, descr, -0.0);
let max = replace_zero(max, descr, 0.0);
Some((min, max))
}
#[inline]
fn replace_zero<T: ParquetValueType>(val: &T, descr: &ColumnDescriptor, replace: f32) -> T {
match T::PHYSICAL_TYPE {
Type::FLOAT if f32::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
T::try_from_le_slice(&f32::to_le_bytes(replace)).unwrap()
}
Type::DOUBLE if f64::from_le_bytes(val.as_bytes().try_into().unwrap()) == 0.0 => {
T::try_from_le_slice(&f64::to_le_bytes(replace as f64)).unwrap()
}
Type::FIXED_LEN_BYTE_ARRAY
if descr.logical_type() == Some(LogicalType::Float16)
&& f16::from_le_bytes(val.as_bytes().try_into().unwrap()) == f16::NEG_ZERO =>
{
T::try_from_le_slice(&f16::to_le_bytes(f16::from_f32(replace))).unwrap()
}
_ => val.clone(),
}
}