1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::file::page_index::column_index::ColumnIndexMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use std::collections::{BTreeSet, VecDeque};
27use std::str;
28
29use crate::basic::{
30 BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType, Type,
31};
32use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
33use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
34use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
35use crate::data_type::private::ParquetValueType;
36use crate::data_type::*;
37use crate::encodings::levels::LevelEncoder;
38#[cfg(feature = "encryption")]
39use crate::encryption::encrypt::get_column_crypto_metadata;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{
42 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
43 OffsetIndexBuilder, PageEncodingStats,
44};
45use crate::file::properties::{
46 EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
47};
48use crate::file::statistics::{Statistics, ValueStatistics};
49use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
50
51pub(crate) mod encoder;
52
53macro_rules! downcast_writer {
54 ($e:expr, $i:ident, $b:expr) => {
55 match $e {
56 Self::BoolColumnWriter($i) => $b,
57 Self::Int32ColumnWriter($i) => $b,
58 Self::Int64ColumnWriter($i) => $b,
59 Self::Int96ColumnWriter($i) => $b,
60 Self::FloatColumnWriter($i) => $b,
61 Self::DoubleColumnWriter($i) => $b,
62 Self::ByteArrayColumnWriter($i) => $b,
63 Self::FixedLenByteArrayColumnWriter($i) => $b,
64 }
65 };
66}
67
68pub enum ColumnWriter<'a> {
72 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
74 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
76 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
78 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
80 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
82 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
84 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
86 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
88}
89
90impl ColumnWriter<'_> {
91 #[cfg(feature = "arrow")]
93 pub(crate) fn memory_size(&self) -> usize {
94 downcast_writer!(self, typed, typed.memory_size())
95 }
96
97 #[cfg(feature = "arrow")]
99 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
100 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
101 }
102
103 #[cfg(feature = "arrow")]
108 pub(crate) fn add_data_page(&mut self) -> Result<()> {
109 downcast_writer!(self, typed, typed.add_data_page())
110 }
111
112 pub fn close(self) -> Result<ColumnCloseResult> {
114 downcast_writer!(self, typed, typed.close())
115 }
116}
117
118pub fn get_column_writer<'a>(
120 descr: ColumnDescPtr,
121 props: WriterPropertiesPtr,
122 page_writer: Box<dyn PageWriter + 'a>,
123) -> ColumnWriter<'a> {
124 match descr.physical_type() {
125 Type::BOOLEAN => {
126 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127 }
128 Type::INT32 => {
129 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130 }
131 Type::INT64 => {
132 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133 }
134 Type::INT96 => {
135 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136 }
137 Type::FLOAT => {
138 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
139 }
140 Type::DOUBLE => {
141 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
142 }
143 Type::BYTE_ARRAY => {
144 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
145 }
146 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
147 ColumnWriterImpl::new(descr, props, page_writer),
148 ),
149 }
150}
151
152pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
157 T::get_column_writer(col_writer).unwrap_or_else(|| {
158 panic!(
159 "Failed to convert column writer into a typed column writer for `{}` type",
160 T::get_physical_type()
161 )
162 })
163}
164
165pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
167 col_writer: &'b ColumnWriter<'a>,
168) -> &'b ColumnWriterImpl<'a, T> {
169 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
170 panic!(
171 "Failed to convert column writer into a typed column writer for `{}` type",
172 T::get_physical_type()
173 )
174 })
175}
176
177pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
179 col_writer: &'a mut ColumnWriter<'b>,
180) -> &'a mut ColumnWriterImpl<'b, T> {
181 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
182 panic!(
183 "Failed to convert column writer into a typed column writer for `{}` type",
184 T::get_physical_type()
185 )
186 })
187}
188
189#[derive(Debug, Clone)]
193pub struct ColumnCloseResult {
194 pub bytes_written: u64,
196 pub rows_written: u64,
198 pub metadata: ColumnChunkMetaData,
200 pub bloom_filter: Option<Sbbf>,
202 pub column_index: Option<ColumnIndexMetaData>,
204 pub offset_index: Option<OffsetIndexMetaData>,
206}
207
208#[derive(Default)]
210struct PageMetrics {
211 num_buffered_values: u32,
212 num_buffered_rows: u32,
213 num_page_nulls: u64,
214 repetition_level_histogram: Option<LevelHistogram>,
215 definition_level_histogram: Option<LevelHistogram>,
216}
217
218impl PageMetrics {
219 fn new() -> Self {
220 Default::default()
221 }
222
223 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
225 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
226 self
227 }
228
229 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
231 self.definition_level_histogram = LevelHistogram::try_new(max_level);
232 self
233 }
234
235 fn new_page(&mut self) {
238 self.num_buffered_values = 0;
239 self.num_buffered_rows = 0;
240 self.num_page_nulls = 0;
241 self.repetition_level_histogram
242 .as_mut()
243 .map(LevelHistogram::reset);
244 self.definition_level_histogram
245 .as_mut()
246 .map(LevelHistogram::reset);
247 }
248}
249
250#[derive(Default)]
252struct ColumnMetrics<T: Default> {
253 total_bytes_written: u64,
254 total_rows_written: u64,
255 total_uncompressed_size: u64,
256 total_compressed_size: u64,
257 total_num_values: u64,
258 dictionary_page_offset: Option<u64>,
259 data_page_offset: Option<u64>,
260 min_column_value: Option<T>,
261 max_column_value: Option<T>,
262 num_column_nulls: u64,
263 column_distinct_count: Option<u64>,
264 variable_length_bytes: Option<i64>,
265 repetition_level_histogram: Option<LevelHistogram>,
266 definition_level_histogram: Option<LevelHistogram>,
267}
268
269impl<T: Default> ColumnMetrics<T> {
270 fn new() -> Self {
271 Default::default()
272 }
273
274 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
276 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
277 self
278 }
279
280 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
282 self.definition_level_histogram = LevelHistogram::try_new(max_level);
283 self
284 }
285
286 fn update_histogram(
288 chunk_histogram: &mut Option<LevelHistogram>,
289 page_histogram: &Option<LevelHistogram>,
290 ) {
291 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
292 chunk_hist.add(page_hist);
293 }
294 }
295
296 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
299 ColumnMetrics::<T>::update_histogram(
300 &mut self.definition_level_histogram,
301 &page_metrics.definition_level_histogram,
302 );
303 ColumnMetrics::<T>::update_histogram(
304 &mut self.repetition_level_histogram,
305 &page_metrics.repetition_level_histogram,
306 );
307 }
308
309 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
311 if let Some(var_bytes) = variable_length_bytes {
312 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
313 }
314 }
315}
316
317#[derive(Debug, Clone, Copy)]
325pub(crate) enum LevelDataRef<'a> {
326 Absent,
327 Materialized(&'a [i16]),
328 Uniform { value: i16, count: usize },
329}
330
331impl<'a> From<&'a [i16]> for LevelDataRef<'a> {
332 fn from(levels: &'a [i16]) -> Self {
333 Self::Materialized(levels)
334 }
335}
336
337impl<'a> From<Option<&'a [i16]>> for LevelDataRef<'a> {
338 fn from(levels: Option<&'a [i16]>) -> Self {
339 levels.map_or(Self::Absent, Self::from)
340 }
341}
342
343impl<'a> LevelDataRef<'a> {
344 pub(crate) fn len(self) -> usize {
345 match self {
346 Self::Absent => 0,
347 Self::Materialized(values) => values.len(),
348 Self::Uniform { count, .. } => count,
349 }
350 }
351
352 pub(crate) fn first(self) -> Option<i16> {
353 match self {
354 Self::Absent => None,
355 Self::Materialized(values) => values.first().copied(),
356 Self::Uniform { value, count } => (count > 0).then_some(value),
357 }
358 }
359
360 #[cfg(feature = "arrow")]
361 pub(crate) fn value_at(self, idx: usize) -> Option<i16> {
362 match self {
363 Self::Absent => None,
364 Self::Materialized(values) => values.get(idx).copied(),
365 Self::Uniform { value, count } => (idx < count).then_some(value),
366 }
367 }
368
369 pub(crate) fn slice(self, offset: usize, len: usize) -> Self {
370 match self {
371 Self::Absent => Self::Absent,
372 Self::Materialized(values) => Self::Materialized(&values[offset..offset + len]),
373 Self::Uniform { value, .. } => Self::Uniform { value, count: len },
374 }
375 }
376}
377
378pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
380
381pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
383 descr: ColumnDescPtr,
385 props: WriterPropertiesPtr,
386 statistics_enabled: EnabledStatistics,
387
388 page_writer: Box<dyn PageWriter + 'a>,
389 codec: Compression,
390 compressor: Option<Box<dyn Codec>>,
391 encoder: E,
392
393 page_metrics: PageMetrics,
394 column_metrics: ColumnMetrics<E::T>,
396
397 encodings: BTreeSet<Encoding>,
400 encoding_stats: Vec<PageEncodingStats>,
401 def_levels_encoder: LevelEncoder,
403 rep_levels_encoder: LevelEncoder,
404 data_pages: VecDeque<CompressedPage>,
405 column_index_builder: ColumnIndexBuilder,
407 offset_index_builder: Option<OffsetIndexBuilder>,
408
409 data_page_boundary_ascending: bool,
412 data_page_boundary_descending: bool,
413 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
415}
416
417impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
418 pub fn new(
420 descr: ColumnDescPtr,
421 props: WriterPropertiesPtr,
422 page_writer: Box<dyn PageWriter + 'a>,
423 ) -> Self {
424 let codec = props.compression(descr.path());
425 let codec_options = CodecOptionsBuilder::default().build();
426 let compressor = create_codec(codec, &codec_options).unwrap();
427 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
428
429 let statistics_enabled = props.statistics_enabled(descr.path());
430
431 let mut encodings = BTreeSet::new();
432 encodings.insert(Encoding::RLE);
434
435 let mut page_metrics = PageMetrics::new();
436 let mut column_metrics = ColumnMetrics::<E::T>::new();
437
438 if statistics_enabled != EnabledStatistics::None {
440 page_metrics = page_metrics
441 .with_repetition_level_histogram(descr.max_rep_level())
442 .with_definition_level_histogram(descr.max_def_level());
443 column_metrics = column_metrics
444 .with_repetition_level_histogram(descr.max_rep_level())
445 .with_definition_level_histogram(descr.max_def_level())
446 }
447
448 let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type());
450 if statistics_enabled != EnabledStatistics::Page {
451 column_index_builder.to_invalid()
452 }
453
454 let offset_index_builder = match props.offset_index_disabled() {
456 false => Some(OffsetIndexBuilder::new()),
457 _ => None,
458 };
459
460 Self {
461 def_levels_encoder: Self::create_level_encoder(descr.max_def_level(), &props),
462 rep_levels_encoder: Self::create_level_encoder(descr.max_rep_level(), &props),
463 descr,
464 props,
465 statistics_enabled,
466 page_writer,
467 codec,
468 compressor,
469 encoder,
470 data_pages: VecDeque::new(),
471 page_metrics,
472 column_metrics,
473 column_index_builder,
474 offset_index_builder,
475 encodings,
476 encoding_stats: vec![],
477 data_page_boundary_ascending: true,
478 data_page_boundary_descending: true,
479 last_non_null_data_page_min_max: None,
480 }
481 }
482
483 #[allow(clippy::too_many_arguments)]
484 pub(crate) fn write_batch_internal(
485 &mut self,
486 values: &E::Values,
487 value_indices: Option<&[usize]>,
488 def_levels: LevelDataRef<'_>,
489 rep_levels: LevelDataRef<'_>,
490 min: Option<&E::T>,
491 max: Option<&E::T>,
492 distinct_count: Option<u64>,
493 ) -> Result<usize> {
494 if def_levels.len() != 0 && rep_levels.len() != 0 && def_levels.len() != rep_levels.len() {
496 return Err(general_err!(
497 "Inconsistent length of definition and repetition levels: {} != {}",
498 def_levels.len(),
499 rep_levels.len()
500 ));
501 }
502
503 let num_levels = def_levels.len().max(rep_levels.len());
513 let num_levels = if num_levels > 0 {
514 num_levels
515 } else {
516 value_indices.map_or(values.len(), |i| i.len())
517 };
518
519 if let Some(min) = min {
520 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
521 }
522 if let Some(max) = max {
523 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
524 }
525
526 if self.encoder.num_values() == 0 {
528 self.column_metrics.column_distinct_count = distinct_count;
529 } else {
530 self.column_metrics.column_distinct_count = None;
531 }
532
533 let mut values_offset = 0;
534 let mut levels_offset = 0;
535 let both_levels_compact = !matches!(def_levels, LevelDataRef::Materialized(_))
536 && !matches!(rep_levels, LevelDataRef::Materialized(_));
537 let has_levels = !matches!(def_levels, LevelDataRef::Absent)
538 || !matches!(rep_levels, LevelDataRef::Absent);
539 let base_batch_size = if both_levels_compact && has_levels {
543 self.props.data_page_row_count_limit()
544 } else {
545 self.props.write_batch_size()
546 };
547 while levels_offset < num_levels {
548 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
549
550 if let LevelDataRef::Materialized(levels) = rep_levels {
552 while end_offset < levels.len() && levels[end_offset] != 0 {
553 end_offset += 1;
554 }
555 }
556
557 values_offset += self.write_mini_batch(
558 values,
559 values_offset,
560 value_indices,
561 end_offset - levels_offset,
562 def_levels.slice(levels_offset, end_offset - levels_offset),
563 rep_levels.slice(levels_offset, end_offset - levels_offset),
564 )?;
565 levels_offset = end_offset;
566 }
567
568 Ok(values_offset)
570 }
571
572 pub fn write_batch(
585 &mut self,
586 values: &E::Values,
587 def_levels: Option<&[i16]>,
588 rep_levels: Option<&[i16]>,
589 ) -> Result<usize> {
590 self.write_batch_internal(
591 values,
592 None,
593 LevelDataRef::from(def_levels),
594 LevelDataRef::from(rep_levels),
595 None,
596 None,
597 None,
598 )
599 }
600
601 pub fn write_batch_with_statistics(
609 &mut self,
610 values: &E::Values,
611 def_levels: Option<&[i16]>,
612 rep_levels: Option<&[i16]>,
613 min: Option<&E::T>,
614 max: Option<&E::T>,
615 distinct_count: Option<u64>,
616 ) -> Result<usize> {
617 self.write_batch_internal(
618 values,
619 None,
620 LevelDataRef::from(def_levels),
621 LevelDataRef::from(rep_levels),
622 min,
623 max,
624 distinct_count,
625 )
626 }
627
628 #[cfg(feature = "arrow")]
633 pub(crate) fn memory_size(&self) -> usize {
634 self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
635 }
636
637 pub fn get_total_bytes_written(&self) -> u64 {
643 self.column_metrics.total_bytes_written
644 }
645
646 #[cfg(feature = "arrow")]
652 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
653 self.data_pages
654 .iter()
655 .map(|page| page.data().len() as u64)
656 .sum::<u64>()
657 + self.column_metrics.total_bytes_written
658 + self.encoder.estimated_data_page_size() as u64
659 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
660 }
661
662 pub fn get_total_rows_written(&self) -> u64 {
665 self.column_metrics.total_rows_written
666 }
667
668 pub fn get_descriptor(&self) -> &ColumnDescPtr {
670 &self.descr
671 }
672
673 pub fn close(mut self) -> Result<ColumnCloseResult> {
676 if self.page_metrics.num_buffered_values > 0 {
677 self.add_data_page()?;
678 }
679 if self.encoder.has_dictionary() {
680 self.write_dictionary_page()?;
681 }
682 self.flush_data_pages()?;
683 let metadata = self.build_column_metadata()?;
684 self.page_writer.close()?;
685
686 let boundary_order = match (
687 self.data_page_boundary_ascending,
688 self.data_page_boundary_descending,
689 ) {
690 (true, _) => BoundaryOrder::ASCENDING,
693 (false, true) => BoundaryOrder::DESCENDING,
694 (false, false) => BoundaryOrder::UNORDERED,
695 };
696 self.column_index_builder.set_boundary_order(boundary_order);
697
698 let column_index = match self.column_index_builder.valid() {
699 true => Some(self.column_index_builder.build()?),
700 false => None,
701 };
702
703 let offset_index = self.offset_index_builder.map(|b| b.build());
704
705 Ok(ColumnCloseResult {
706 bytes_written: self.column_metrics.total_bytes_written,
707 rows_written: self.column_metrics.total_rows_written,
708 bloom_filter: self.encoder.flush_bloom_filter(),
709 metadata,
710 column_index,
711 offset_index,
712 })
713 }
714
715 fn create_level_encoder(max_level: i16, props: &WriterProperties) -> LevelEncoder {
717 match props.writer_version() {
718 WriterVersion::PARQUET_1_0 => LevelEncoder::v1_streaming(max_level),
719 WriterVersion::PARQUET_2_0 => LevelEncoder::v2_streaming(max_level),
720 }
721 }
722
723 fn write_mini_batch(
727 &mut self,
728 values: &E::Values,
729 values_offset: usize,
730 value_indices: Option<&[usize]>,
731 num_levels: usize,
732 def_levels: LevelDataRef<'_>,
733 rep_levels: LevelDataRef<'_>,
734 ) -> Result<usize> {
735 let values_to_write = if self.descr.max_def_level() > 0 {
737 let max_def = self.descr.max_def_level();
738 match def_levels {
739 LevelDataRef::Absent => {
740 return Err(general_err!(
741 "Definition levels are required, because max definition level = {}",
742 self.descr.max_def_level()
743 ));
744 }
745 LevelDataRef::Materialized(levels) => {
746 let mut values_to_write = 0usize;
749 let encoder = &mut self.def_levels_encoder;
750 match self.page_metrics.definition_level_histogram.as_mut() {
751 Some(histogram) => encoder.put_with_observer(levels, |level, count| {
752 values_to_write += count * (level == max_def) as usize;
753 histogram.increment_by(level, count as i64);
754 }),
755 None => encoder.put_with_observer(levels, |level, count| {
756 values_to_write += count * (level == max_def) as usize;
757 }),
758 };
759 self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
760 values_to_write
761 }
762 LevelDataRef::Uniform { value, count } => {
763 let encoder = &mut self.def_levels_encoder;
766 match self.page_metrics.definition_level_histogram.as_mut() {
767 Some(histogram) => {
768 encoder.put_n_with_observer(value, count, |level, run_len| {
769 histogram.increment_by(level, run_len as i64);
770 })
771 }
772 None => encoder.put_n_with_observer(value, count, |_, _| {}),
773 };
774 let values_to_write = count * (value == max_def) as usize;
775 self.page_metrics.num_page_nulls += (count - values_to_write) as u64;
776 values_to_write
777 }
778 }
779 } else {
780 num_levels
781 };
782
783 if self.descr.max_rep_level() > 0 {
785 let first_level = rep_levels.first().ok_or_else(|| {
787 general_err!(
788 "Repetition levels are required, because max repetition level = {}",
789 self.descr.max_rep_level()
790 )
791 })?;
792
793 if first_level != 0 {
794 return Err(general_err!(
795 "Write must start at a record boundary, got non-zero repetition level of {}",
796 first_level
797 ));
798 }
799
800 let mut new_rows = 0u32;
801 match rep_levels {
802 LevelDataRef::Absent => unreachable!(),
803 LevelDataRef::Materialized(levels) => {
804 let encoder = &mut self.rep_levels_encoder;
805 match self.page_metrics.repetition_level_histogram.as_mut() {
806 Some(histogram) => encoder.put_with_observer(levels, |level, count| {
807 new_rows += (count as u32) * (level == 0) as u32;
808 histogram.increment_by(level, count as i64);
809 }),
810 None => encoder.put_with_observer(levels, |level, count| {
811 new_rows += (count as u32) * (level == 0) as u32;
812 }),
813 };
814 }
815 LevelDataRef::Uniform { value, count } => {
816 let encoder = &mut self.rep_levels_encoder;
817 match self.page_metrics.repetition_level_histogram.as_mut() {
818 Some(histogram) => {
819 encoder.put_n_with_observer(value, count, |level, run_len| {
820 new_rows += (run_len as u32) * (level == 0) as u32;
821 histogram.increment_by(level, run_len as i64);
822 })
823 }
824 None => encoder.put_n_with_observer(value, count, |level, run_len| {
825 new_rows += (run_len as u32) * (level == 0) as u32;
826 }),
827 };
828 }
829 }
830 self.page_metrics.num_buffered_rows += new_rows;
831 } else {
832 self.page_metrics.num_buffered_rows += num_levels as u32;
835 }
836
837 match value_indices {
838 Some(indices) => {
839 let indices = &indices[values_offset..values_offset + values_to_write];
840 self.encoder.write_gather(values, indices)?;
841 }
842 None => self.encoder.write(values, values_offset, values_to_write)?,
843 }
844
845 self.page_metrics.num_buffered_values += num_levels as u32;
846
847 if self.should_add_data_page() {
848 self.add_data_page()?;
849 }
850
851 if self.should_dict_fallback() {
852 self.dict_fallback()?;
853 }
854
855 Ok(values_to_write)
856 }
857
858 #[inline]
863 fn should_dict_fallback(&self) -> bool {
864 match self.encoder.estimated_dict_page_size() {
865 Some(size) => {
866 size >= self
867 .props
868 .column_dictionary_page_size_limit(self.descr.path())
869 }
870 None => false,
871 }
872 }
873
874 #[inline]
876 fn should_add_data_page(&self) -> bool {
877 if self.page_metrics.num_buffered_values == 0 {
882 return false;
883 }
884
885 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
886 || self.encoder.estimated_data_page_size()
887 >= self.props.column_data_page_size_limit(self.descr.path())
888 }
889
890 fn dict_fallback(&mut self) -> Result<()> {
893 if self.page_metrics.num_buffered_values > 0 {
895 self.add_data_page()?;
896 }
897 self.write_dictionary_page()?;
898 self.flush_data_pages()?;
899 Ok(())
900 }
901
902 fn update_column_offset_index(
904 &mut self,
905 page_statistics: Option<&ValueStatistics<E::T>>,
906 page_variable_length_bytes: Option<i64>,
907 ) {
908 let null_page =
910 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
911 if null_page && self.column_index_builder.valid() {
914 self.column_index_builder.append(
915 null_page,
916 vec![],
917 vec![],
918 self.page_metrics.num_page_nulls as i64,
919 );
920 } else if self.column_index_builder.valid() {
921 match &page_statistics {
924 None => {
925 self.column_index_builder.to_invalid();
926 }
927 Some(stat) => {
928 let new_min = stat.min_opt().unwrap();
930 let new_max = stat.max_opt().unwrap();
931 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
932 if self.data_page_boundary_ascending {
933 let not_ascending = compare_greater(&self.descr, last_min, new_min)
935 || compare_greater(&self.descr, last_max, new_max);
936 if not_ascending {
937 self.data_page_boundary_ascending = false;
938 }
939 }
940
941 if self.data_page_boundary_descending {
942 let not_descending = compare_greater(&self.descr, new_min, last_min)
944 || compare_greater(&self.descr, new_max, last_max);
945 if not_descending {
946 self.data_page_boundary_descending = false;
947 }
948 }
949 }
950 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
951
952 if self.can_truncate_value() {
953 self.column_index_builder.append(
954 null_page,
955 self.truncate_min_value(
956 self.props.column_index_truncate_length(),
957 stat.min_bytes_opt().unwrap(),
958 )
959 .0,
960 self.truncate_max_value(
961 self.props.column_index_truncate_length(),
962 stat.max_bytes_opt().unwrap(),
963 )
964 .0,
965 self.page_metrics.num_page_nulls as i64,
966 );
967 } else {
968 self.column_index_builder.append(
969 null_page,
970 stat.min_bytes_opt().unwrap().to_vec(),
971 stat.max_bytes_opt().unwrap().to_vec(),
972 self.page_metrics.num_page_nulls as i64,
973 );
974 }
975 }
976 }
977 }
978
979 self.column_index_builder.append_histograms(
981 &self.page_metrics.repetition_level_histogram,
982 &self.page_metrics.definition_level_histogram,
983 );
984
985 if let Some(builder) = self.offset_index_builder.as_mut() {
987 builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
988 builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
989 }
990 }
991
992 fn can_truncate_value(&self) -> bool {
994 match self.descr.physical_type() {
995 Type::FIXED_LEN_BYTE_ARRAY
999 if !matches!(
1000 self.descr.logical_type_ref(),
1001 Some(&LogicalType::Decimal { .. }) | Some(&LogicalType::Float16)
1002 ) =>
1003 {
1004 true
1005 }
1006 Type::BYTE_ARRAY => true,
1007 _ => false,
1009 }
1010 }
1011
1012 fn is_utf8(&self) -> bool {
1014 self.get_descriptor().logical_type_ref() == Some(&LogicalType::String)
1015 || self.get_descriptor().converted_type() == ConvertedType::UTF8
1016 }
1017
1018 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
1029 truncation_length
1030 .filter(|l| data.len() > *l)
1031 .and_then(|l|
1032 if self.is_utf8() {
1034 match str::from_utf8(data) {
1035 Ok(str_data) => truncate_utf8(str_data, l),
1036 Err(_) => Some(data[..l].to_vec()),
1037 }
1038 } else {
1039 Some(data[..l].to_vec())
1040 }
1041 )
1042 .map(|truncated| (truncated, true))
1043 .unwrap_or_else(|| (data.to_vec(), false))
1044 }
1045
1046 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
1060 truncation_length
1061 .filter(|l| data.len() > *l)
1062 .and_then(|l|
1063 if self.is_utf8() {
1065 match str::from_utf8(data) {
1066 Ok(str_data) => truncate_and_increment_utf8(str_data, l),
1067 Err(_) => increment(data[..l].to_vec()),
1068 }
1069 } else {
1070 increment(data[..l].to_vec())
1071 }
1072 )
1073 .map(|truncated| (truncated, true))
1074 .unwrap_or_else(|| (data.to_vec(), false))
1075 }
1076
1077 fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
1080 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1081 match statistics {
1082 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
1083 let (min, did_truncate_min) = self.truncate_min_value(
1084 self.props.statistics_truncate_length(),
1085 stats.min_bytes_opt().unwrap(),
1086 );
1087 let (max, did_truncate_max) = self.truncate_max_value(
1088 self.props.statistics_truncate_length(),
1089 stats.max_bytes_opt().unwrap(),
1090 );
1091 Statistics::ByteArray(
1092 ValueStatistics::new(
1093 Some(min.into()),
1094 Some(max.into()),
1095 stats.distinct_count(),
1096 stats.null_count_opt(),
1097 backwards_compatible_min_max,
1098 )
1099 .with_max_is_exact(!did_truncate_max)
1100 .with_min_is_exact(!did_truncate_min),
1101 )
1102 }
1103 Statistics::FixedLenByteArray(stats)
1104 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
1105 {
1106 let (min, did_truncate_min) = self.truncate_min_value(
1107 self.props.statistics_truncate_length(),
1108 stats.min_bytes_opt().unwrap(),
1109 );
1110 let (max, did_truncate_max) = self.truncate_max_value(
1111 self.props.statistics_truncate_length(),
1112 stats.max_bytes_opt().unwrap(),
1113 );
1114 Statistics::FixedLenByteArray(
1115 ValueStatistics::new(
1116 Some(min.into()),
1117 Some(max.into()),
1118 stats.distinct_count(),
1119 stats.null_count_opt(),
1120 backwards_compatible_min_max,
1121 )
1122 .with_max_is_exact(!did_truncate_max)
1123 .with_min_is_exact(!did_truncate_min),
1124 )
1125 }
1126 stats => stats,
1127 }
1128 }
1129
1130 pub(crate) fn add_data_page(&mut self) -> Result<()> {
1133 let values_data = self.encoder.flush_data_page()?;
1135
1136 let max_def_level = self.descr.max_def_level();
1137 let max_rep_level = self.descr.max_rep_level();
1138
1139 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1140
1141 let page_statistics = match (values_data.min_value, values_data.max_value) {
1142 (Some(min), Some(max)) => {
1143 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1145 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1146
1147 (self.statistics_enabled == EnabledStatistics::Page).then_some(
1148 ValueStatistics::new(
1149 Some(min),
1150 Some(max),
1151 None,
1152 Some(self.page_metrics.num_page_nulls),
1153 false,
1154 ),
1155 )
1156 }
1157 _ => None,
1158 };
1159
1160 self.update_column_offset_index(
1162 page_statistics.as_ref(),
1163 values_data.variable_length_bytes,
1164 );
1165
1166 self.column_metrics
1168 .update_from_page_metrics(&self.page_metrics);
1169 self.column_metrics
1170 .update_variable_length_bytes(values_data.variable_length_bytes);
1171
1172 let page_statistics = page_statistics
1174 .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1175 .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1176
1177 let compressed_page = match self.props.writer_version() {
1178 WriterVersion::PARQUET_1_0 => {
1179 let mut buffer = vec![];
1180
1181 if max_rep_level > 0 {
1182 self.rep_levels_encoder
1183 .flush_to(|data| buffer.extend_from_slice(data));
1184 }
1185
1186 if max_def_level > 0 {
1187 self.def_levels_encoder
1188 .flush_to(|data| buffer.extend_from_slice(data));
1189 }
1190
1191 buffer.extend_from_slice(&values_data.buf);
1192 let uncompressed_size = buffer.len();
1193
1194 if let Some(ref mut cmpr) = self.compressor {
1195 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1196 cmpr.compress(&buffer[..], &mut compressed_buf)?;
1197 compressed_buf.shrink_to_fit();
1198 buffer = compressed_buf;
1199 }
1200
1201 let data_page = Page::DataPage {
1202 buf: buffer.into(),
1203 num_values: self.page_metrics.num_buffered_values,
1204 encoding: values_data.encoding,
1205 def_level_encoding: Encoding::RLE,
1206 rep_level_encoding: Encoding::RLE,
1207 statistics: page_statistics,
1208 };
1209
1210 CompressedPage::new(data_page, uncompressed_size)
1211 }
1212 WriterVersion::PARQUET_2_0 => {
1213 let mut rep_levels_byte_len = 0;
1214 let mut def_levels_byte_len = 0;
1215 let mut buffer = vec![];
1216
1217 if max_rep_level > 0 {
1218 self.rep_levels_encoder
1219 .flush_to(|data| buffer.extend_from_slice(data));
1220 rep_levels_byte_len = buffer.len();
1221 }
1222
1223 if max_def_level > 0 {
1224 self.def_levels_encoder
1225 .flush_to(|data| buffer.extend_from_slice(data));
1226 def_levels_byte_len = buffer.len() - rep_levels_byte_len;
1227 }
1228
1229 let uncompressed_size =
1230 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1231
1232 let is_compressed = match self.compressor {
1234 Some(ref mut cmpr) => {
1235 let buffer_len = buffer.len();
1236 cmpr.compress(&values_data.buf, &mut buffer)?;
1237 let compressed_values_size = buffer.len() - buffer_len;
1238 let threshold = self
1239 .props
1240 .column_data_page_v2_compression_ratio_threshold(self.descr.path());
1241 if (compressed_values_size as f64) >= (uncompressed_size as f64) * threshold
1242 {
1243 buffer.truncate(buffer_len);
1244 buffer.extend_from_slice(&values_data.buf);
1245 false
1246 } else {
1247 true
1248 }
1249 }
1250 None => {
1251 buffer.extend_from_slice(&values_data.buf);
1252 false
1253 }
1254 };
1255
1256 let data_page = Page::DataPageV2 {
1257 buf: buffer.into(),
1258 num_values: self.page_metrics.num_buffered_values,
1259 encoding: values_data.encoding,
1260 num_nulls: self.page_metrics.num_page_nulls as u32,
1261 num_rows: self.page_metrics.num_buffered_rows,
1262 def_levels_byte_len: def_levels_byte_len as u32,
1263 rep_levels_byte_len: rep_levels_byte_len as u32,
1264 is_compressed,
1265 statistics: page_statistics,
1266 };
1267
1268 CompressedPage::new(data_page, uncompressed_size)
1269 }
1270 };
1271
1272 if self.encoder.has_dictionary() {
1274 self.data_pages.push_back(compressed_page);
1275 } else {
1276 self.write_data_page(compressed_page)?;
1277 }
1278
1279 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1281 self.page_metrics.new_page();
1282
1283 Ok(())
1284 }
1285
1286 #[inline]
1289 fn flush_data_pages(&mut self) -> Result<()> {
1290 if self.page_metrics.num_buffered_values > 0 {
1292 self.add_data_page()?;
1293 }
1294
1295 while let Some(page) = self.data_pages.pop_front() {
1296 self.write_data_page(page)?;
1297 }
1298
1299 Ok(())
1300 }
1301
1302 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1304 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1305 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1306 let num_values = self.column_metrics.total_num_values as i64;
1307 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1308 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1310
1311 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1312 .set_compression(self.codec)
1313 .set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
1314 .set_page_encoding_stats(self.encoding_stats.clone())
1315 .set_total_compressed_size(total_compressed_size)
1316 .set_total_uncompressed_size(total_uncompressed_size)
1317 .set_num_values(num_values)
1318 .set_data_page_offset(data_page_offset)
1319 .set_dictionary_page_offset(dict_page_offset);
1320
1321 if self.statistics_enabled != EnabledStatistics::None {
1322 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1323
1324 let statistics = ValueStatistics::<E::T>::new(
1325 self.column_metrics.min_column_value.clone(),
1326 self.column_metrics.max_column_value.clone(),
1327 self.column_metrics.column_distinct_count,
1328 Some(self.column_metrics.num_column_nulls),
1329 false,
1330 )
1331 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1332 .into();
1333
1334 let statistics = self.truncate_statistics(statistics);
1335
1336 builder = builder
1337 .set_statistics(statistics)
1338 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1339 .set_repetition_level_histogram(
1340 self.column_metrics.repetition_level_histogram.take(),
1341 )
1342 .set_definition_level_histogram(
1343 self.column_metrics.definition_level_histogram.take(),
1344 );
1345
1346 if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
1347 builder = builder.set_geo_statistics(geo_stats);
1348 }
1349 }
1350
1351 builder = self.set_column_chunk_encryption_properties(builder);
1352
1353 let metadata = builder.build()?;
1354 Ok(metadata)
1355 }
1356
1357 #[inline]
1359 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1360 self.encodings.insert(page.encoding());
1361 match self.encoding_stats.last_mut() {
1362 Some(encoding_stats)
1363 if encoding_stats.page_type == page.page_type()
1364 && encoding_stats.encoding == page.encoding() =>
1365 {
1366 encoding_stats.count += 1;
1367 }
1368 _ => {
1369 self.encoding_stats.push(PageEncodingStats {
1372 page_type: page.page_type(),
1373 encoding: page.encoding(),
1374 count: 1,
1375 });
1376 }
1377 }
1378 let page_spec = self.page_writer.write_page(page)?;
1379 if let Some(builder) = self.offset_index_builder.as_mut() {
1382 builder
1383 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1384 }
1385 self.update_metrics_for_page(page_spec);
1386 Ok(())
1387 }
1388
1389 #[inline]
1391 fn write_dictionary_page(&mut self) -> Result<()> {
1392 let compressed_page = {
1393 let mut page = self
1394 .encoder
1395 .flush_dict_page()?
1396 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1397
1398 let uncompressed_size = page.buf.len();
1399
1400 if let Some(ref mut cmpr) = self.compressor {
1401 let mut output_buf = Vec::with_capacity(uncompressed_size);
1402 cmpr.compress(&page.buf, &mut output_buf)?;
1403 page.buf = Bytes::from(output_buf);
1404 }
1405
1406 let dict_page = Page::DictionaryPage {
1407 buf: page.buf,
1408 num_values: page.num_values as u32,
1409 encoding: self.props.dictionary_page_encoding(),
1410 is_sorted: page.is_sorted,
1411 };
1412 CompressedPage::new(dict_page, uncompressed_size)
1413 };
1414
1415 self.encodings.insert(compressed_page.encoding());
1416 self.encoding_stats.push(PageEncodingStats {
1417 page_type: PageType::DICTIONARY_PAGE,
1418 encoding: compressed_page.encoding(),
1419 count: 1,
1420 });
1421 let page_spec = self.page_writer.write_page(compressed_page)?;
1422 self.update_metrics_for_page(page_spec);
1423 Ok(())
1425 }
1426
1427 #[inline]
1429 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1430 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1431 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1432 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1433
1434 match page_spec.page_type {
1435 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1436 self.column_metrics.total_num_values += page_spec.num_values as u64;
1437 if self.column_metrics.data_page_offset.is_none() {
1438 self.column_metrics.data_page_offset = Some(page_spec.offset);
1439 }
1440 }
1441 PageType::DICTIONARY_PAGE => {
1442 assert!(
1443 self.column_metrics.dictionary_page_offset.is_none(),
1444 "Dictionary offset is already set"
1445 );
1446 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1447 }
1448 _ => {}
1449 }
1450 }
1451
1452 #[inline]
1453 #[cfg(feature = "encryption")]
1454 fn set_column_chunk_encryption_properties(
1455 &self,
1456 builder: ColumnChunkMetaDataBuilder,
1457 ) -> ColumnChunkMetaDataBuilder {
1458 if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1459 builder.set_column_crypto_metadata(get_column_crypto_metadata(
1460 encryption_properties,
1461 &self.descr,
1462 ))
1463 } else {
1464 builder
1465 }
1466 }
1467
1468 #[inline]
1469 #[cfg(not(feature = "encryption"))]
1470 fn set_column_chunk_encryption_properties(
1471 &self,
1472 builder: ColumnChunkMetaDataBuilder,
1473 ) -> ColumnChunkMetaDataBuilder {
1474 builder
1475 }
1476}
1477
1478fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1479 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1480}
1481
1482fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1483 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1484}
1485
1486#[inline]
1487#[allow(clippy::eq_op)]
1488fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1489 match T::PHYSICAL_TYPE {
1490 Type::FLOAT | Type::DOUBLE => val != val,
1491 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type_ref() == Some(&LogicalType::Float16) => {
1492 let val = val.as_bytes();
1493 let val = f16::from_le_bytes([val[0], val[1]]);
1494 val.is_nan()
1495 }
1496 _ => false,
1497 }
1498}
1499
1500fn update_stat<T: ParquetValueType, F>(
1505 descr: &ColumnDescriptor,
1506 val: &T,
1507 cur: &mut Option<T>,
1508 should_update: F,
1509) where
1510 F: Fn(&T) -> bool,
1511{
1512 if is_nan(descr, val) {
1513 return;
1514 }
1515
1516 if cur.as_ref().is_none_or(should_update) {
1517 *cur = Some(val.clone());
1518 }
1519}
1520
1521fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1523 match T::PHYSICAL_TYPE {
1524 Type::INT32 | Type::INT64 => {
1525 if let Some(LogicalType::Integer {
1526 is_signed: false, ..
1527 }) = descr.logical_type_ref()
1528 {
1529 return compare_greater_unsigned_int(a, b);
1531 }
1532
1533 match descr.converted_type() {
1534 ConvertedType::UINT_8
1535 | ConvertedType::UINT_16
1536 | ConvertedType::UINT_32
1537 | ConvertedType::UINT_64 => {
1538 return compare_greater_unsigned_int(a, b);
1539 }
1540 _ => {}
1541 };
1542 }
1543 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1544 if let Some(LogicalType::Decimal { .. }) = descr.logical_type_ref() {
1545 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1546 }
1547 if let ConvertedType::DECIMAL = descr.converted_type() {
1548 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1549 }
1550 if let Some(LogicalType::Float16) = descr.logical_type_ref() {
1551 return compare_greater_f16(a.as_bytes(), b.as_bytes());
1552 }
1553 }
1554
1555 _ => {}
1556 }
1557
1558 a > b
1560}
1561
1562fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1570 match (kind, props.writer_version()) {
1571 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1572 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1573 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1574 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1575 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1576 _ => Encoding::PLAIN,
1577 }
1578}
1579
1580fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1582 match (kind, props.writer_version()) {
1583 (Type::BOOLEAN, _) => false,
1585 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1587 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1588 _ => true,
1589 }
1590}
1591
1592#[inline]
1593fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1594 a.as_u64().unwrap() > b.as_u64().unwrap()
1595}
1596
1597#[inline]
1598fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1599 let a = f16::from_le_bytes(a.try_into().unwrap());
1600 let b = f16::from_le_bytes(b.try_into().unwrap());
1601 a > b
1602}
1603
1604fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1606 let a_length = a.len();
1607 let b_length = b.len();
1608
1609 if a_length == 0 || b_length == 0 {
1610 return a_length > 0;
1611 }
1612
1613 let first_a: u8 = a[0];
1614 let first_b: u8 = b[0];
1615
1616 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1621 return (first_a as i8) > (first_b as i8);
1622 }
1623
1624 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1630
1631 if a_length != b_length {
1632 let not_equal = if a_length > b_length {
1633 let lead_length = a_length - b_length;
1634 a[0..lead_length].iter().any(|&x| x != extension)
1635 } else {
1636 let lead_length = b_length - a_length;
1637 b[0..lead_length].iter().any(|&x| x != extension)
1638 };
1639
1640 if not_equal {
1641 let negative_values: bool = (first_a as i8) < 0;
1642 let a_longer: bool = a_length > b_length;
1643 return if negative_values { !a_longer } else { a_longer };
1644 }
1645 }
1646
1647 (a[1..]) > (b[1..])
1648}
1649
1650fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1656 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1657 Some(data.as_bytes()[..split].to_vec())
1658}
1659
1660fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1666 let lower_bound = length.saturating_sub(3);
1668 let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1669 increment_utf8(data.get(..split)?)
1670}
1671
1672fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1679 for (idx, original_char) in data.char_indices().rev() {
1680 let original_len = original_char.len_utf8();
1681 if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1682 if next_char.len_utf8() == original_len {
1684 let mut result = data.as_bytes()[..idx + original_len].to_vec();
1685 next_char.encode_utf8(&mut result[idx..]);
1686 return Some(result);
1687 }
1688 }
1689 }
1690
1691 None
1692}
1693
1694fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1698 for byte in data.iter_mut().rev() {
1699 let (incremented, overflow) = byte.overflowing_add(1);
1700 *byte = incremented;
1701
1702 if !overflow {
1703 return Some(data);
1704 }
1705 }
1706
1707 None
1708}
1709
1710#[cfg(test)]
1711mod tests {
1712 use crate::{
1713 file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1714 schema::parser::parse_message_type,
1715 };
1716 use core::str;
1717 use rand::distr::uniform::SampleUniform;
1718 use std::{fs::File, sync::Arc};
1719
1720 use crate::column::{
1721 page::PageReader,
1722 reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader},
1723 };
1724 use crate::file::writer::TrackedWrite;
1725 use crate::file::{
1726 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1727 };
1728 use crate::schema::types::{ColumnPath, Type as SchemaType};
1729 use crate::util::test_common::rand_gen::random_numbers_range;
1730
1731 use super::*;
1732
1733 #[test]
1734 fn test_column_writer_inconsistent_def_rep_length() {
1735 let page_writer = get_test_page_writer();
1736 let props = Default::default();
1737 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1738 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1739 assert!(res.is_err());
1740 if let Err(err) = res {
1741 assert_eq!(
1742 format!("{err}"),
1743 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1744 );
1745 }
1746 }
1747
1748 #[test]
1749 fn test_column_writer_invalid_def_levels() {
1750 let page_writer = get_test_page_writer();
1751 let props = Default::default();
1752 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1753 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1754 assert!(res.is_err());
1755 if let Err(err) = res {
1756 assert_eq!(
1757 format!("{err}"),
1758 "Parquet error: Definition levels are required, because max definition level = 1"
1759 );
1760 }
1761 }
1762
1763 #[test]
1764 fn test_column_writer_invalid_rep_levels() {
1765 let page_writer = get_test_page_writer();
1766 let props = Default::default();
1767 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1768 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1769 assert!(res.is_err());
1770 if let Err(err) = res {
1771 assert_eq!(
1772 format!("{err}"),
1773 "Parquet error: Repetition levels are required, because max repetition level = 1"
1774 );
1775 }
1776 }
1777
1778 #[test]
1779 fn test_column_writer_not_enough_values_to_write() {
1780 let page_writer = get_test_page_writer();
1781 let props = Default::default();
1782 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1783 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1784 assert!(res.is_err());
1785 if let Err(err) = res {
1786 assert_eq!(
1787 format!("{err}"),
1788 "Parquet error: Expected to write 4 values, but have only 2"
1789 );
1790 }
1791 }
1792
1793 #[test]
1794 fn test_column_writer_write_only_one_dictionary_page() {
1795 let page_writer = get_test_page_writer();
1796 let props = Default::default();
1797 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1798 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1799 writer.add_data_page().unwrap();
1801 writer.write_dictionary_page().unwrap();
1802 let err = writer.write_dictionary_page().unwrap_err().to_string();
1803 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1804 }
1805
1806 #[test]
1807 fn test_column_writer_error_when_writing_disabled_dictionary() {
1808 let page_writer = get_test_page_writer();
1809 let props = Arc::new(
1810 WriterProperties::builder()
1811 .set_dictionary_enabled(false)
1812 .build(),
1813 );
1814 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1815 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1816 let err = writer.write_dictionary_page().unwrap_err().to_string();
1817 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1818 }
1819
1820 #[test]
1821 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1822 let page_writer = get_test_page_writer();
1823 let props = Arc::new(
1824 WriterProperties::builder()
1825 .set_dictionary_enabled(true)
1826 .build(),
1827 );
1828 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1829 writer
1830 .write_batch(&[true, false, true, false], None, None)
1831 .unwrap();
1832
1833 let r = writer.close().unwrap();
1834 assert_eq!(r.bytes_written, 1);
1837 assert_eq!(r.rows_written, 4);
1838
1839 let metadata = r.metadata;
1840 assert_eq!(
1841 metadata.encodings().collect::<Vec<_>>(),
1842 vec![Encoding::PLAIN, Encoding::RLE]
1843 );
1844 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
1846 }
1847
1848 #[test]
1849 fn test_column_writer_default_encoding_support_bool() {
1850 check_encoding_write_support::<BoolType>(
1851 WriterVersion::PARQUET_1_0,
1852 true,
1853 &[true, false],
1854 None,
1855 &[Encoding::PLAIN, Encoding::RLE],
1856 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1857 );
1858 check_encoding_write_support::<BoolType>(
1859 WriterVersion::PARQUET_1_0,
1860 false,
1861 &[true, false],
1862 None,
1863 &[Encoding::PLAIN, Encoding::RLE],
1864 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1865 );
1866 check_encoding_write_support::<BoolType>(
1867 WriterVersion::PARQUET_2_0,
1868 true,
1869 &[true, false],
1870 None,
1871 &[Encoding::RLE],
1872 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1873 );
1874 check_encoding_write_support::<BoolType>(
1875 WriterVersion::PARQUET_2_0,
1876 false,
1877 &[true, false],
1878 None,
1879 &[Encoding::RLE],
1880 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1881 );
1882 }
1883
1884 #[test]
1885 fn test_column_writer_default_encoding_support_int32() {
1886 check_encoding_write_support::<Int32Type>(
1887 WriterVersion::PARQUET_1_0,
1888 true,
1889 &[1, 2],
1890 Some(0),
1891 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1892 &[
1893 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1894 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1895 ],
1896 );
1897 check_encoding_write_support::<Int32Type>(
1898 WriterVersion::PARQUET_1_0,
1899 false,
1900 &[1, 2],
1901 None,
1902 &[Encoding::PLAIN, Encoding::RLE],
1903 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1904 );
1905 check_encoding_write_support::<Int32Type>(
1906 WriterVersion::PARQUET_2_0,
1907 true,
1908 &[1, 2],
1909 Some(0),
1910 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1911 &[
1912 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1913 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1914 ],
1915 );
1916 check_encoding_write_support::<Int32Type>(
1917 WriterVersion::PARQUET_2_0,
1918 false,
1919 &[1, 2],
1920 None,
1921 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1922 &[encoding_stats(
1923 PageType::DATA_PAGE_V2,
1924 Encoding::DELTA_BINARY_PACKED,
1925 1,
1926 )],
1927 );
1928 }
1929
1930 #[test]
1931 fn test_column_writer_default_encoding_support_int64() {
1932 check_encoding_write_support::<Int64Type>(
1933 WriterVersion::PARQUET_1_0,
1934 true,
1935 &[1, 2],
1936 Some(0),
1937 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1938 &[
1939 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1940 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1941 ],
1942 );
1943 check_encoding_write_support::<Int64Type>(
1944 WriterVersion::PARQUET_1_0,
1945 false,
1946 &[1, 2],
1947 None,
1948 &[Encoding::PLAIN, Encoding::RLE],
1949 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1950 );
1951 check_encoding_write_support::<Int64Type>(
1952 WriterVersion::PARQUET_2_0,
1953 true,
1954 &[1, 2],
1955 Some(0),
1956 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1957 &[
1958 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1959 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1960 ],
1961 );
1962 check_encoding_write_support::<Int64Type>(
1963 WriterVersion::PARQUET_2_0,
1964 false,
1965 &[1, 2],
1966 None,
1967 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1968 &[encoding_stats(
1969 PageType::DATA_PAGE_V2,
1970 Encoding::DELTA_BINARY_PACKED,
1971 1,
1972 )],
1973 );
1974 }
1975
1976 #[test]
1977 fn test_column_writer_default_encoding_support_int96() {
1978 check_encoding_write_support::<Int96Type>(
1979 WriterVersion::PARQUET_1_0,
1980 true,
1981 &[Int96::from(vec![1, 2, 3])],
1982 Some(0),
1983 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1984 &[
1985 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1986 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1987 ],
1988 );
1989 check_encoding_write_support::<Int96Type>(
1990 WriterVersion::PARQUET_1_0,
1991 false,
1992 &[Int96::from(vec![1, 2, 3])],
1993 None,
1994 &[Encoding::PLAIN, Encoding::RLE],
1995 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1996 );
1997 check_encoding_write_support::<Int96Type>(
1998 WriterVersion::PARQUET_2_0,
1999 true,
2000 &[Int96::from(vec![1, 2, 3])],
2001 Some(0),
2002 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2003 &[
2004 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2005 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2006 ],
2007 );
2008 check_encoding_write_support::<Int96Type>(
2009 WriterVersion::PARQUET_2_0,
2010 false,
2011 &[Int96::from(vec![1, 2, 3])],
2012 None,
2013 &[Encoding::PLAIN, Encoding::RLE],
2014 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
2015 );
2016 }
2017
2018 #[test]
2019 fn test_column_writer_default_encoding_support_float() {
2020 check_encoding_write_support::<FloatType>(
2021 WriterVersion::PARQUET_1_0,
2022 true,
2023 &[1.0, 2.0],
2024 Some(0),
2025 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2026 &[
2027 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2028 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2029 ],
2030 );
2031 check_encoding_write_support::<FloatType>(
2032 WriterVersion::PARQUET_1_0,
2033 false,
2034 &[1.0, 2.0],
2035 None,
2036 &[Encoding::PLAIN, Encoding::RLE],
2037 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2038 );
2039 check_encoding_write_support::<FloatType>(
2040 WriterVersion::PARQUET_2_0,
2041 true,
2042 &[1.0, 2.0],
2043 Some(0),
2044 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2045 &[
2046 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2047 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2048 ],
2049 );
2050 check_encoding_write_support::<FloatType>(
2051 WriterVersion::PARQUET_2_0,
2052 false,
2053 &[1.0, 2.0],
2054 None,
2055 &[Encoding::PLAIN, Encoding::RLE],
2056 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
2057 );
2058 }
2059
2060 #[test]
2061 fn test_column_writer_default_encoding_support_double() {
2062 check_encoding_write_support::<DoubleType>(
2063 WriterVersion::PARQUET_1_0,
2064 true,
2065 &[1.0, 2.0],
2066 Some(0),
2067 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2068 &[
2069 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2070 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2071 ],
2072 );
2073 check_encoding_write_support::<DoubleType>(
2074 WriterVersion::PARQUET_1_0,
2075 false,
2076 &[1.0, 2.0],
2077 None,
2078 &[Encoding::PLAIN, Encoding::RLE],
2079 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2080 );
2081 check_encoding_write_support::<DoubleType>(
2082 WriterVersion::PARQUET_2_0,
2083 true,
2084 &[1.0, 2.0],
2085 Some(0),
2086 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2087 &[
2088 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2089 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2090 ],
2091 );
2092 check_encoding_write_support::<DoubleType>(
2093 WriterVersion::PARQUET_2_0,
2094 false,
2095 &[1.0, 2.0],
2096 None,
2097 &[Encoding::PLAIN, Encoding::RLE],
2098 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
2099 );
2100 }
2101
2102 #[test]
2103 fn test_column_writer_default_encoding_support_byte_array() {
2104 check_encoding_write_support::<ByteArrayType>(
2105 WriterVersion::PARQUET_1_0,
2106 true,
2107 &[ByteArray::from(vec![1u8])],
2108 Some(0),
2109 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2110 &[
2111 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2112 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2113 ],
2114 );
2115 check_encoding_write_support::<ByteArrayType>(
2116 WriterVersion::PARQUET_1_0,
2117 false,
2118 &[ByteArray::from(vec![1u8])],
2119 None,
2120 &[Encoding::PLAIN, Encoding::RLE],
2121 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2122 );
2123 check_encoding_write_support::<ByteArrayType>(
2124 WriterVersion::PARQUET_2_0,
2125 true,
2126 &[ByteArray::from(vec![1u8])],
2127 Some(0),
2128 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2129 &[
2130 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2131 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2132 ],
2133 );
2134 check_encoding_write_support::<ByteArrayType>(
2135 WriterVersion::PARQUET_2_0,
2136 false,
2137 &[ByteArray::from(vec![1u8])],
2138 None,
2139 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2140 &[encoding_stats(
2141 PageType::DATA_PAGE_V2,
2142 Encoding::DELTA_BYTE_ARRAY,
2143 1,
2144 )],
2145 );
2146 }
2147
2148 #[test]
2149 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2150 check_encoding_write_support::<FixedLenByteArrayType>(
2151 WriterVersion::PARQUET_1_0,
2152 true,
2153 &[ByteArray::from(vec![1u8]).into()],
2154 None,
2155 &[Encoding::PLAIN, Encoding::RLE],
2156 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2157 );
2158 check_encoding_write_support::<FixedLenByteArrayType>(
2159 WriterVersion::PARQUET_1_0,
2160 false,
2161 &[ByteArray::from(vec![1u8]).into()],
2162 None,
2163 &[Encoding::PLAIN, Encoding::RLE],
2164 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2165 );
2166 check_encoding_write_support::<FixedLenByteArrayType>(
2167 WriterVersion::PARQUET_2_0,
2168 true,
2169 &[ByteArray::from(vec![1u8]).into()],
2170 Some(0),
2171 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2172 &[
2173 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2174 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2175 ],
2176 );
2177 check_encoding_write_support::<FixedLenByteArrayType>(
2178 WriterVersion::PARQUET_2_0,
2179 false,
2180 &[ByteArray::from(vec![1u8]).into()],
2181 None,
2182 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2183 &[encoding_stats(
2184 PageType::DATA_PAGE_V2,
2185 Encoding::DELTA_BYTE_ARRAY,
2186 1,
2187 )],
2188 );
2189 }
2190
2191 #[test]
2192 fn test_column_writer_check_metadata() {
2193 let page_writer = get_test_page_writer();
2194 let props = Default::default();
2195 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2196 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2197
2198 let r = writer.close().unwrap();
2199 assert_eq!(r.bytes_written, 20);
2200 assert_eq!(r.rows_written, 4);
2201
2202 let metadata = r.metadata;
2203 assert_eq!(
2204 metadata.encodings().collect::<Vec<_>>(),
2205 vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2206 );
2207 assert_eq!(metadata.num_values(), 4);
2208 assert_eq!(metadata.compressed_size(), 20);
2209 assert_eq!(metadata.uncompressed_size(), 20);
2210 assert_eq!(metadata.data_page_offset(), 0);
2211 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2212 if let Some(stats) = metadata.statistics() {
2213 assert_eq!(stats.null_count_opt(), Some(0));
2214 assert_eq!(stats.distinct_count_opt(), None);
2215 if let Statistics::Int32(stats) = stats {
2216 assert_eq!(stats.min_opt().unwrap(), &1);
2217 assert_eq!(stats.max_opt().unwrap(), &4);
2218 } else {
2219 panic!("expecting Statistics::Int32");
2220 }
2221 } else {
2222 panic!("metadata missing statistics");
2223 }
2224 }
2225
2226 #[test]
2227 fn test_column_writer_check_byte_array_min_max() {
2228 let page_writer = get_test_page_writer();
2229 let props = Default::default();
2230 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2231 writer
2232 .write_batch(
2233 &[
2234 ByteArray::from(vec![
2235 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2236 35u8, 231u8, 90u8, 0u8, 0u8,
2237 ]),
2238 ByteArray::from(vec![
2239 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2240 152u8, 177u8, 56u8, 0u8, 0u8,
2241 ]),
2242 ByteArray::from(vec![
2243 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2244 0u8,
2245 ]),
2246 ByteArray::from(vec![
2247 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2248 44u8, 0u8, 0u8,
2249 ]),
2250 ],
2251 None,
2252 None,
2253 )
2254 .unwrap();
2255 let metadata = writer.close().unwrap().metadata;
2256 if let Some(stats) = metadata.statistics() {
2257 if let Statistics::ByteArray(stats) = stats {
2258 assert_eq!(
2259 stats.min_opt().unwrap(),
2260 &ByteArray::from(vec![
2261 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2262 35u8, 231u8, 90u8, 0u8, 0u8,
2263 ])
2264 );
2265 assert_eq!(
2266 stats.max_opt().unwrap(),
2267 &ByteArray::from(vec![
2268 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2269 44u8, 0u8, 0u8,
2270 ])
2271 );
2272 } else {
2273 panic!("expecting Statistics::ByteArray");
2274 }
2275 } else {
2276 panic!("metadata missing statistics");
2277 }
2278 }
2279
2280 #[test]
2281 fn test_column_writer_uint32_converted_type_min_max() {
2282 let page_writer = get_test_page_writer();
2283 let props = Default::default();
2284 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2285 page_writer,
2286 0,
2287 0,
2288 props,
2289 );
2290 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2291 let metadata = writer.close().unwrap().metadata;
2292 if let Some(stats) = metadata.statistics() {
2293 if let Statistics::Int32(stats) = stats {
2294 assert_eq!(stats.min_opt().unwrap(), &0,);
2295 assert_eq!(stats.max_opt().unwrap(), &5,);
2296 } else {
2297 panic!("expecting Statistics::Int32");
2298 }
2299 } else {
2300 panic!("metadata missing statistics");
2301 }
2302 }
2303
2304 #[test]
2305 fn test_column_writer_precalculated_statistics() {
2306 let page_writer = get_test_page_writer();
2307 let props = Arc::new(
2308 WriterProperties::builder()
2309 .set_statistics_enabled(EnabledStatistics::Chunk)
2310 .build(),
2311 );
2312 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2313 writer
2314 .write_batch_with_statistics(
2315 &[1, 2, 3, 4],
2316 None,
2317 None,
2318 Some(&-17),
2319 Some(&9000),
2320 Some(55),
2321 )
2322 .unwrap();
2323
2324 let r = writer.close().unwrap();
2325 assert_eq!(r.bytes_written, 20);
2326 assert_eq!(r.rows_written, 4);
2327
2328 let metadata = r.metadata;
2329 assert_eq!(
2330 metadata.encodings().collect::<Vec<_>>(),
2331 vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2332 );
2333 assert_eq!(metadata.num_values(), 4);
2334 assert_eq!(metadata.compressed_size(), 20);
2335 assert_eq!(metadata.uncompressed_size(), 20);
2336 assert_eq!(metadata.data_page_offset(), 0);
2337 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2338 if let Some(stats) = metadata.statistics() {
2339 assert_eq!(stats.null_count_opt(), Some(0));
2340 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2341 if let Statistics::Int32(stats) = stats {
2342 assert_eq!(stats.min_opt().unwrap(), &-17);
2343 assert_eq!(stats.max_opt().unwrap(), &9000);
2344 } else {
2345 panic!("expecting Statistics::Int32");
2346 }
2347 } else {
2348 panic!("metadata missing statistics");
2349 }
2350 }
2351
2352 #[test]
2353 fn test_mixed_precomputed_statistics() {
2354 let mut buf = Vec::with_capacity(100);
2355 let mut write = TrackedWrite::new(&mut buf);
2356 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2357 let props = Arc::new(
2358 WriterProperties::builder()
2359 .set_write_page_header_statistics(true)
2360 .build(),
2361 );
2362 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2363
2364 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2365 writer
2366 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2367 .unwrap();
2368
2369 let r = writer.close().unwrap();
2370
2371 let stats = r.metadata.statistics().unwrap();
2372 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2373 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2374 assert_eq!(stats.null_count_opt(), Some(0));
2375 assert!(stats.distinct_count_opt().is_none());
2376
2377 drop(write);
2378
2379 let props = ReaderProperties::builder()
2380 .set_backward_compatible_lz4(false)
2381 .set_read_page_statistics(true)
2382 .build();
2383 let reader = SerializedPageReader::new_with_properties(
2384 Arc::new(Bytes::from(buf)),
2385 &r.metadata,
2386 r.rows_written as usize,
2387 None,
2388 Arc::new(props),
2389 )
2390 .unwrap();
2391
2392 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2393 assert_eq!(pages.len(), 2);
2394
2395 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2396 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2397
2398 let page_statistics = pages[1].statistics().unwrap();
2399 assert_eq!(
2400 page_statistics.min_bytes_opt().unwrap(),
2401 1_i32.to_le_bytes()
2402 );
2403 assert_eq!(
2404 page_statistics.max_bytes_opt().unwrap(),
2405 7_i32.to_le_bytes()
2406 );
2407 assert_eq!(page_statistics.null_count_opt(), Some(0));
2408 assert!(page_statistics.distinct_count_opt().is_none());
2409 }
2410
2411 #[test]
2412 fn test_disabled_statistics() {
2413 let mut buf = Vec::with_capacity(100);
2414 let mut write = TrackedWrite::new(&mut buf);
2415 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2416 let props = WriterProperties::builder()
2417 .set_statistics_enabled(EnabledStatistics::None)
2418 .set_writer_version(WriterVersion::PARQUET_2_0)
2419 .build();
2420 let props = Arc::new(props);
2421
2422 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2423 writer
2424 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2425 .unwrap();
2426
2427 let r = writer.close().unwrap();
2428 assert!(r.metadata.statistics().is_none());
2429
2430 drop(write);
2431
2432 let props = ReaderProperties::builder()
2433 .set_backward_compatible_lz4(false)
2434 .build();
2435 let reader = SerializedPageReader::new_with_properties(
2436 Arc::new(Bytes::from(buf)),
2437 &r.metadata,
2438 r.rows_written as usize,
2439 None,
2440 Arc::new(props),
2441 )
2442 .unwrap();
2443
2444 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2445 assert_eq!(pages.len(), 2);
2446
2447 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2448 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2449
2450 match &pages[1] {
2451 Page::DataPageV2 {
2452 num_values,
2453 num_nulls,
2454 num_rows,
2455 statistics,
2456 ..
2457 } => {
2458 assert_eq!(*num_values, 6);
2459 assert_eq!(*num_nulls, 2);
2460 assert_eq!(*num_rows, 6);
2461 assert!(statistics.is_none());
2462 }
2463 _ => unreachable!(),
2464 }
2465 }
2466
2467 #[test]
2468 fn test_column_writer_empty_column_roundtrip() {
2469 let props = Default::default();
2470 column_roundtrip::<Int32Type>(props, &[], None, None);
2471 }
2472
2473 #[test]
2474 fn test_column_writer_non_nullable_values_roundtrip() {
2475 let props = Default::default();
2476 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2477 }
2478
2479 #[test]
2480 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2481 let props = Default::default();
2482 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2483 }
2484
2485 #[test]
2486 fn test_column_writer_nullable_repeated_values_roundtrip() {
2487 let props = Default::default();
2488 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2489 }
2490
2491 #[test]
2492 fn test_column_writer_dictionary_fallback_small_data_page() {
2493 let props = WriterProperties::builder()
2494 .set_dictionary_page_size_limit(32)
2495 .set_data_page_size_limit(32)
2496 .build();
2497 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2498 }
2499
2500 #[test]
2501 fn test_column_writer_small_write_batch_size() {
2502 for i in &[1usize, 2, 5, 10, 11, 1023] {
2503 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2504
2505 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2506 }
2507 }
2508
2509 #[test]
2510 fn test_column_writer_dictionary_disabled_v1() {
2511 let props = WriterProperties::builder()
2512 .set_writer_version(WriterVersion::PARQUET_1_0)
2513 .set_dictionary_enabled(false)
2514 .build();
2515 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2516 }
2517
2518 #[test]
2519 fn test_column_writer_dictionary_disabled_v2() {
2520 let props = WriterProperties::builder()
2521 .set_writer_version(WriterVersion::PARQUET_2_0)
2522 .set_dictionary_enabled(false)
2523 .build();
2524 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2525 }
2526
2527 #[test]
2528 fn test_column_writer_compression_v1() {
2529 let props = WriterProperties::builder()
2530 .set_writer_version(WriterVersion::PARQUET_1_0)
2531 .set_compression(Compression::SNAPPY)
2532 .build();
2533 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2534 }
2535
2536 #[test]
2537 fn test_column_writer_compression_v2() {
2538 let props = WriterProperties::builder()
2539 .set_writer_version(WriterVersion::PARQUET_2_0)
2540 .set_compression(Compression::SNAPPY)
2541 .build();
2542 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2543 }
2544
2545 #[test]
2546 fn test_column_writer_v2_compression_ratio_threshold() {
2547 fn write_v2_page(threshold: f64) -> bool {
2548 let mut buf = Vec::with_capacity(4096);
2549 let mut write = TrackedWrite::new(&mut buf);
2550 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2551 let props = Arc::new(
2552 WriterProperties::builder()
2553 .set_writer_version(WriterVersion::PARQUET_2_0)
2554 .set_compression(Compression::SNAPPY)
2555 .set_dictionary_enabled(false)
2556 .set_data_page_v2_compression_ratio_threshold(threshold)
2557 .build(),
2558 );
2559
2560 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2561 let values: Vec<i32> = vec![42; 4096];
2562 writer.write_batch(&values, None, None).unwrap();
2563 let r = writer.close().unwrap();
2564 drop(write);
2565
2566 let reader_props = ReaderProperties::builder()
2567 .set_backward_compatible_lz4(false)
2568 .build();
2569 let reader = SerializedPageReader::new_with_properties(
2570 Arc::new(Bytes::from(buf)),
2571 &r.metadata,
2572 r.rows_written as usize,
2573 None,
2574 Arc::new(reader_props),
2575 )
2576 .unwrap();
2577 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2578 let data_page = pages
2579 .iter()
2580 .find(|p| p.page_type() == PageType::DATA_PAGE_V2)
2581 .expect("expected a v2 data page");
2582 match data_page {
2583 Page::DataPageV2 { is_compressed, .. } => *is_compressed,
2584 _ => unreachable!(),
2585 }
2586 }
2587
2588 assert!(write_v2_page(1.0));
2590 assert!(!write_v2_page(0.001));
2592 }
2593
2594 #[test]
2595 fn test_column_writer_add_data_pages_with_dict() {
2596 let mut file = tempfile::tempfile().unwrap();
2599 let mut write = TrackedWrite::new(&mut file);
2600 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2601 let props = Arc::new(
2602 WriterProperties::builder()
2603 .set_data_page_size_limit(10)
2604 .set_write_batch_size(3) .build(),
2606 );
2607 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2608 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2609 writer.write_batch(data, None, None).unwrap();
2610 let r = writer.close().unwrap();
2611
2612 drop(write);
2613
2614 let props = ReaderProperties::builder()
2616 .set_backward_compatible_lz4(false)
2617 .build();
2618 let mut page_reader = Box::new(
2619 SerializedPageReader::new_with_properties(
2620 Arc::new(file),
2621 &r.metadata,
2622 r.rows_written as usize,
2623 None,
2624 Arc::new(props),
2625 )
2626 .unwrap(),
2627 );
2628 let mut res = Vec::new();
2629 while let Some(page) = page_reader.get_next_page().unwrap() {
2630 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2631 }
2632 assert_eq!(
2633 res,
2634 vec![
2635 (PageType::DICTIONARY_PAGE, 10, 40),
2636 (PageType::DATA_PAGE, 9, 10),
2637 (PageType::DATA_PAGE, 1, 3),
2638 ]
2639 );
2640 assert_eq!(
2641 r.metadata.page_encoding_stats(),
2642 Some(&vec![
2643 PageEncodingStats {
2644 page_type: PageType::DICTIONARY_PAGE,
2645 encoding: Encoding::PLAIN,
2646 count: 1
2647 },
2648 PageEncodingStats {
2649 page_type: PageType::DATA_PAGE,
2650 encoding: Encoding::RLE_DICTIONARY,
2651 count: 2,
2652 }
2653 ])
2654 );
2655 }
2656
2657 #[test]
2658 fn test_column_writer_column_data_page_size_limit() {
2659 let props = Arc::new(
2660 WriterProperties::builder()
2661 .set_writer_version(WriterVersion::PARQUET_1_0)
2662 .set_dictionary_enabled(false)
2663 .set_data_page_size_limit(1000)
2664 .set_column_data_page_size_limit(ColumnPath::from("col"), 10)
2665 .set_write_batch_size(3)
2666 .build(),
2667 );
2668 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2669
2670 let col_values =
2671 write_and_collect_page_values(ColumnPath::from("col"), Arc::clone(&props), data);
2672 let other_values = write_and_collect_page_values(ColumnPath::from("other"), props, data);
2673
2674 assert_eq!(col_values, vec![3, 3, 3, 1]);
2675 assert_eq!(other_values, vec![10]);
2676 }
2677
2678 #[test]
2679 fn test_bool_statistics() {
2680 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2681 assert!(!stats.is_min_max_backwards_compatible());
2684 if let Statistics::Boolean(stats) = stats {
2685 assert_eq!(stats.min_opt().unwrap(), &false);
2686 assert_eq!(stats.max_opt().unwrap(), &true);
2687 } else {
2688 panic!("expecting Statistics::Boolean, got {stats:?}");
2689 }
2690 }
2691
2692 #[test]
2693 fn test_int32_statistics() {
2694 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2695 assert!(stats.is_min_max_backwards_compatible());
2696 if let Statistics::Int32(stats) = stats {
2697 assert_eq!(stats.min_opt().unwrap(), &-2);
2698 assert_eq!(stats.max_opt().unwrap(), &3);
2699 } else {
2700 panic!("expecting Statistics::Int32, got {stats:?}");
2701 }
2702 }
2703
2704 #[test]
2705 fn test_int64_statistics() {
2706 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2707 assert!(stats.is_min_max_backwards_compatible());
2708 if let Statistics::Int64(stats) = stats {
2709 assert_eq!(stats.min_opt().unwrap(), &-2);
2710 assert_eq!(stats.max_opt().unwrap(), &3);
2711 } else {
2712 panic!("expecting Statistics::Int64, got {stats:?}");
2713 }
2714 }
2715
2716 #[test]
2717 fn test_int96_statistics() {
2718 let input = vec![
2719 Int96::from(vec![1, 20, 30]),
2720 Int96::from(vec![3, 20, 10]),
2721 Int96::from(vec![0, 20, 30]),
2722 Int96::from(vec![2, 20, 30]),
2723 ]
2724 .into_iter()
2725 .collect::<Vec<Int96>>();
2726
2727 let stats = statistics_roundtrip::<Int96Type>(&input);
2728 assert!(!stats.is_min_max_backwards_compatible());
2729 if let Statistics::Int96(stats) = stats {
2730 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2731 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
2732 } else {
2733 panic!("expecting Statistics::Int96, got {stats:?}");
2734 }
2735 }
2736
2737 #[test]
2738 fn test_float_statistics() {
2739 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2740 assert!(stats.is_min_max_backwards_compatible());
2741 if let Statistics::Float(stats) = stats {
2742 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2743 assert_eq!(stats.max_opt().unwrap(), &3.0);
2744 } else {
2745 panic!("expecting Statistics::Float, got {stats:?}");
2746 }
2747 }
2748
2749 #[test]
2750 fn test_double_statistics() {
2751 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2752 assert!(stats.is_min_max_backwards_compatible());
2753 if let Statistics::Double(stats) = stats {
2754 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2755 assert_eq!(stats.max_opt().unwrap(), &3.0);
2756 } else {
2757 panic!("expecting Statistics::Double, got {stats:?}");
2758 }
2759 }
2760
2761 #[test]
2762 fn test_byte_array_statistics() {
2763 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2764 .iter()
2765 .map(|&s| s.into())
2766 .collect::<Vec<_>>();
2767
2768 let stats = statistics_roundtrip::<ByteArrayType>(&input);
2769 assert!(!stats.is_min_max_backwards_compatible());
2770 if let Statistics::ByteArray(stats) = stats {
2771 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2772 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2773 } else {
2774 panic!("expecting Statistics::ByteArray, got {stats:?}");
2775 }
2776 }
2777
2778 #[test]
2779 fn test_fixed_len_byte_array_statistics() {
2780 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
2781 .iter()
2782 .map(|&s| ByteArray::from(s).into())
2783 .collect::<Vec<_>>();
2784
2785 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2786 assert!(!stats.is_min_max_backwards_compatible());
2787 if let Statistics::FixedLenByteArray(stats) = stats {
2788 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
2789 assert_eq!(stats.min_opt().unwrap(), &expected_min);
2790 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
2791 assert_eq!(stats.max_opt().unwrap(), &expected_max);
2792 } else {
2793 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2794 }
2795 }
2796
2797 #[test]
2798 fn test_column_writer_check_float16_min_max() {
2799 let input = [
2800 -f16::ONE,
2801 f16::from_f32(3.0),
2802 -f16::from_f32(2.0),
2803 f16::from_f32(2.0),
2804 ]
2805 .into_iter()
2806 .map(|s| ByteArray::from(s).into())
2807 .collect::<Vec<_>>();
2808
2809 let stats = float16_statistics_roundtrip(&input);
2810 assert!(stats.is_min_max_backwards_compatible());
2811 assert_eq!(
2812 stats.min_opt().unwrap(),
2813 &ByteArray::from(-f16::from_f32(2.0))
2814 );
2815 assert_eq!(
2816 stats.max_opt().unwrap(),
2817 &ByteArray::from(f16::from_f32(3.0))
2818 );
2819 }
2820
2821 #[test]
2822 fn test_column_writer_check_float16_nan_middle() {
2823 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2824 .into_iter()
2825 .map(|s| ByteArray::from(s).into())
2826 .collect::<Vec<_>>();
2827
2828 let stats = float16_statistics_roundtrip(&input);
2829 assert!(stats.is_min_max_backwards_compatible());
2830 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2831 assert_eq!(
2832 stats.max_opt().unwrap(),
2833 &ByteArray::from(f16::ONE + f16::ONE)
2834 );
2835 }
2836
2837 #[test]
2838 fn test_float16_statistics_nan_middle() {
2839 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2840 .into_iter()
2841 .map(|s| ByteArray::from(s).into())
2842 .collect::<Vec<_>>();
2843
2844 let stats = float16_statistics_roundtrip(&input);
2845 assert!(stats.is_min_max_backwards_compatible());
2846 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2847 assert_eq!(
2848 stats.max_opt().unwrap(),
2849 &ByteArray::from(f16::ONE + f16::ONE)
2850 );
2851 }
2852
2853 #[test]
2854 fn test_float16_statistics_nan_start() {
2855 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2856 .into_iter()
2857 .map(|s| ByteArray::from(s).into())
2858 .collect::<Vec<_>>();
2859
2860 let stats = float16_statistics_roundtrip(&input);
2861 assert!(stats.is_min_max_backwards_compatible());
2862 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2863 assert_eq!(
2864 stats.max_opt().unwrap(),
2865 &ByteArray::from(f16::ONE + f16::ONE)
2866 );
2867 }
2868
2869 #[test]
2870 fn test_float16_statistics_nan_only() {
2871 let input = [f16::NAN, f16::NAN]
2872 .into_iter()
2873 .map(|s| ByteArray::from(s).into())
2874 .collect::<Vec<_>>();
2875
2876 let stats = float16_statistics_roundtrip(&input);
2877 assert!(stats.min_bytes_opt().is_none());
2878 assert!(stats.max_bytes_opt().is_none());
2879 assert!(stats.is_min_max_backwards_compatible());
2880 }
2881
2882 #[test]
2883 fn test_float16_statistics_zero_only() {
2884 let input = [f16::ZERO]
2885 .into_iter()
2886 .map(|s| ByteArray::from(s).into())
2887 .collect::<Vec<_>>();
2888
2889 let stats = float16_statistics_roundtrip(&input);
2890 assert!(stats.is_min_max_backwards_compatible());
2891 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2892 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2893 }
2894
2895 #[test]
2896 fn test_float16_statistics_neg_zero_only() {
2897 let input = [f16::NEG_ZERO]
2898 .into_iter()
2899 .map(|s| ByteArray::from(s).into())
2900 .collect::<Vec<_>>();
2901
2902 let stats = float16_statistics_roundtrip(&input);
2903 assert!(stats.is_min_max_backwards_compatible());
2904 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2905 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2906 }
2907
2908 #[test]
2909 fn test_float16_statistics_zero_min() {
2910 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2911 .into_iter()
2912 .map(|s| ByteArray::from(s).into())
2913 .collect::<Vec<_>>();
2914
2915 let stats = float16_statistics_roundtrip(&input);
2916 assert!(stats.is_min_max_backwards_compatible());
2917 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2918 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2919 }
2920
2921 #[test]
2922 fn test_float16_statistics_neg_zero_max() {
2923 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2924 .into_iter()
2925 .map(|s| ByteArray::from(s).into())
2926 .collect::<Vec<_>>();
2927
2928 let stats = float16_statistics_roundtrip(&input);
2929 assert!(stats.is_min_max_backwards_compatible());
2930 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2931 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2932 }
2933
2934 #[test]
2935 fn test_float_statistics_nan_middle() {
2936 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2937 assert!(stats.is_min_max_backwards_compatible());
2938 if let Statistics::Float(stats) = stats {
2939 assert_eq!(stats.min_opt().unwrap(), &1.0);
2940 assert_eq!(stats.max_opt().unwrap(), &2.0);
2941 } else {
2942 panic!("expecting Statistics::Float");
2943 }
2944 }
2945
2946 #[test]
2947 fn test_float_statistics_nan_start() {
2948 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2949 assert!(stats.is_min_max_backwards_compatible());
2950 if let Statistics::Float(stats) = stats {
2951 assert_eq!(stats.min_opt().unwrap(), &1.0);
2952 assert_eq!(stats.max_opt().unwrap(), &2.0);
2953 } else {
2954 panic!("expecting Statistics::Float");
2955 }
2956 }
2957
2958 #[test]
2959 fn test_float_statistics_nan_only() {
2960 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2961 assert!(stats.min_bytes_opt().is_none());
2962 assert!(stats.max_bytes_opt().is_none());
2963 assert!(stats.is_min_max_backwards_compatible());
2964 assert!(matches!(stats, Statistics::Float(_)));
2965 }
2966
2967 #[test]
2968 fn test_float_statistics_zero_only() {
2969 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2970 assert!(stats.is_min_max_backwards_compatible());
2971 if let Statistics::Float(stats) = stats {
2972 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2973 assert!(stats.min_opt().unwrap().is_sign_negative());
2974 assert_eq!(stats.max_opt().unwrap(), &0.0);
2975 assert!(stats.max_opt().unwrap().is_sign_positive());
2976 } else {
2977 panic!("expecting Statistics::Float");
2978 }
2979 }
2980
2981 #[test]
2982 fn test_float_statistics_neg_zero_only() {
2983 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2984 assert!(stats.is_min_max_backwards_compatible());
2985 if let Statistics::Float(stats) = stats {
2986 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2987 assert!(stats.min_opt().unwrap().is_sign_negative());
2988 assert_eq!(stats.max_opt().unwrap(), &0.0);
2989 assert!(stats.max_opt().unwrap().is_sign_positive());
2990 } else {
2991 panic!("expecting Statistics::Float");
2992 }
2993 }
2994
2995 #[test]
2996 fn test_float_statistics_zero_min() {
2997 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2998 assert!(stats.is_min_max_backwards_compatible());
2999 if let Statistics::Float(stats) = stats {
3000 assert_eq!(stats.min_opt().unwrap(), &-0.0);
3001 assert!(stats.min_opt().unwrap().is_sign_negative());
3002 assert_eq!(stats.max_opt().unwrap(), &2.0);
3003 } else {
3004 panic!("expecting Statistics::Float");
3005 }
3006 }
3007
3008 #[test]
3009 fn test_float_statistics_neg_zero_max() {
3010 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
3011 assert!(stats.is_min_max_backwards_compatible());
3012 if let Statistics::Float(stats) = stats {
3013 assert_eq!(stats.min_opt().unwrap(), &-2.0);
3014 assert_eq!(stats.max_opt().unwrap(), &0.0);
3015 assert!(stats.max_opt().unwrap().is_sign_positive());
3016 } else {
3017 panic!("expecting Statistics::Float");
3018 }
3019 }
3020
3021 #[test]
3022 fn test_double_statistics_nan_middle() {
3023 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
3024 assert!(stats.is_min_max_backwards_compatible());
3025 if let Statistics::Double(stats) = stats {
3026 assert_eq!(stats.min_opt().unwrap(), &1.0);
3027 assert_eq!(stats.max_opt().unwrap(), &2.0);
3028 } else {
3029 panic!("expecting Statistics::Double");
3030 }
3031 }
3032
3033 #[test]
3034 fn test_double_statistics_nan_start() {
3035 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
3036 assert!(stats.is_min_max_backwards_compatible());
3037 if let Statistics::Double(stats) = stats {
3038 assert_eq!(stats.min_opt().unwrap(), &1.0);
3039 assert_eq!(stats.max_opt().unwrap(), &2.0);
3040 } else {
3041 panic!("expecting Statistics::Double");
3042 }
3043 }
3044
3045 #[test]
3046 fn test_double_statistics_nan_only() {
3047 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
3048 assert!(stats.min_bytes_opt().is_none());
3049 assert!(stats.max_bytes_opt().is_none());
3050 assert!(matches!(stats, Statistics::Double(_)));
3051 assert!(stats.is_min_max_backwards_compatible());
3052 }
3053
3054 #[test]
3055 fn test_double_statistics_zero_only() {
3056 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
3057 assert!(stats.is_min_max_backwards_compatible());
3058 if let Statistics::Double(stats) = stats {
3059 assert_eq!(stats.min_opt().unwrap(), &-0.0);
3060 assert!(stats.min_opt().unwrap().is_sign_negative());
3061 assert_eq!(stats.max_opt().unwrap(), &0.0);
3062 assert!(stats.max_opt().unwrap().is_sign_positive());
3063 } else {
3064 panic!("expecting Statistics::Double");
3065 }
3066 }
3067
3068 #[test]
3069 fn test_double_statistics_neg_zero_only() {
3070 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
3071 assert!(stats.is_min_max_backwards_compatible());
3072 if let Statistics::Double(stats) = stats {
3073 assert_eq!(stats.min_opt().unwrap(), &-0.0);
3074 assert!(stats.min_opt().unwrap().is_sign_negative());
3075 assert_eq!(stats.max_opt().unwrap(), &0.0);
3076 assert!(stats.max_opt().unwrap().is_sign_positive());
3077 } else {
3078 panic!("expecting Statistics::Double");
3079 }
3080 }
3081
3082 #[test]
3083 fn test_double_statistics_zero_min() {
3084 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
3085 assert!(stats.is_min_max_backwards_compatible());
3086 if let Statistics::Double(stats) = stats {
3087 assert_eq!(stats.min_opt().unwrap(), &-0.0);
3088 assert!(stats.min_opt().unwrap().is_sign_negative());
3089 assert_eq!(stats.max_opt().unwrap(), &2.0);
3090 } else {
3091 panic!("expecting Statistics::Double");
3092 }
3093 }
3094
3095 #[test]
3096 fn test_double_statistics_neg_zero_max() {
3097 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
3098 assert!(stats.is_min_max_backwards_compatible());
3099 if let Statistics::Double(stats) = stats {
3100 assert_eq!(stats.min_opt().unwrap(), &-2.0);
3101 assert_eq!(stats.max_opt().unwrap(), &0.0);
3102 assert!(stats.max_opt().unwrap().is_sign_positive());
3103 } else {
3104 panic!("expecting Statistics::Double");
3105 }
3106 }
3107
3108 #[test]
3109 fn test_compare_greater_byte_array_decimals() {
3110 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
3111 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
3112 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
3113 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
3114 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
3115 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
3116 assert!(!compare_greater_byte_array_decimals(
3117 &[0u8, 1u8,],
3118 &[1u8, 0u8,],
3119 ),);
3120 assert!(!compare_greater_byte_array_decimals(
3121 &[255u8, 35u8, 0u8, 0u8,],
3122 &[0u8,],
3123 ),);
3124 assert!(compare_greater_byte_array_decimals(
3125 &[0u8,],
3126 &[255u8, 35u8, 0u8, 0u8,],
3127 ),);
3128 }
3129
3130 #[test]
3131 fn test_column_index_with_null_pages() {
3132 let page_writer = get_test_page_writer();
3134 let props = Default::default();
3135 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
3136 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
3137
3138 let r = writer.close().unwrap();
3139 assert!(r.column_index.is_some());
3140 let col_idx = r.column_index.unwrap();
3141 let col_idx = match col_idx {
3142 ColumnIndexMetaData::INT32(col_idx) => col_idx,
3143 _ => panic!("wrong stats type"),
3144 };
3145 assert!(col_idx.is_null_page(0));
3147 assert!(col_idx.min_value(0).is_none());
3149 assert!(col_idx.max_value(0).is_none());
3150 assert!(col_idx.null_count(0).is_some());
3152 assert_eq!(col_idx.null_count(0), Some(4));
3153 assert!(col_idx.repetition_level_histogram(0).is_none());
3155 assert!(col_idx.definition_level_histogram(0).is_some());
3157 assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
3158 }
3159
3160 #[test]
3161 fn test_column_offset_index_metadata() {
3162 let page_writer = get_test_page_writer();
3165 let props = Default::default();
3166 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3167 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3168 writer.flush_data_pages().unwrap();
3170 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
3172
3173 let r = writer.close().unwrap();
3174 let column_index = r.column_index.unwrap();
3175 let offset_index = r.offset_index.unwrap();
3176
3177 assert_eq!(8, r.rows_written);
3178
3179 let column_index = match column_index {
3181 ColumnIndexMetaData::INT32(column_index) => column_index,
3182 _ => panic!("wrong stats type"),
3183 };
3184 assert_eq!(2, column_index.num_pages());
3185 assert_eq!(2, offset_index.page_locations.len());
3186 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
3187 for idx in 0..2 {
3188 assert!(!column_index.is_null_page(idx));
3189 assert_eq!(0, column_index.null_count(0).unwrap());
3190 }
3191
3192 if let Some(stats) = r.metadata.statistics() {
3193 assert_eq!(stats.null_count_opt(), Some(0));
3194 assert_eq!(stats.distinct_count_opt(), None);
3195 if let Statistics::Int32(stats) = stats {
3196 assert_eq!(stats.min_opt(), column_index.min_value(1));
3200 assert_eq!(stats.max_opt(), column_index.max_value(1));
3201 } else {
3202 panic!("expecting Statistics::Int32");
3203 }
3204 } else {
3205 panic!("metadata missing statistics");
3206 }
3207
3208 assert_eq!(0, offset_index.page_locations[0].first_row_index);
3210 assert_eq!(4, offset_index.page_locations[1].first_row_index);
3211 }
3212
3213 #[test]
3215 fn test_column_offset_index_metadata_truncating() {
3216 let page_writer = get_test_page_writer();
3219 let props = WriterProperties::builder()
3220 .set_statistics_truncate_length(None) .build()
3222 .into();
3223 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3224
3225 let mut data = vec![FixedLenByteArray::default(); 3];
3226 data[0].set_data(Bytes::from(vec![97_u8; 200]));
3228 data[1].set_data(Bytes::from(vec![112_u8; 200]));
3230 data[2].set_data(Bytes::from(vec![98_u8; 200]));
3231
3232 writer.write_batch(&data, None, None).unwrap();
3233
3234 writer.flush_data_pages().unwrap();
3235
3236 let r = writer.close().unwrap();
3237 let column_index = r.column_index.unwrap();
3238 let offset_index = r.offset_index.unwrap();
3239
3240 let column_index = match column_index {
3241 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3242 _ => panic!("wrong stats type"),
3243 };
3244
3245 assert_eq!(3, r.rows_written);
3246
3247 assert_eq!(1, column_index.num_pages());
3249 assert_eq!(1, offset_index.page_locations.len());
3250 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3251 assert!(!column_index.is_null_page(0));
3252 assert_eq!(Some(0), column_index.null_count(0));
3253
3254 if let Some(stats) = r.metadata.statistics() {
3255 assert_eq!(stats.null_count_opt(), Some(0));
3256 assert_eq!(stats.distinct_count_opt(), None);
3257 if let Statistics::FixedLenByteArray(stats) = stats {
3258 let column_index_min_value = column_index.min_value(0).unwrap();
3259 let column_index_max_value = column_index.max_value(0).unwrap();
3260
3261 assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value);
3263 assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value);
3264
3265 assert_eq!(
3266 column_index_min_value.len(),
3267 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3268 );
3269 assert_eq!(column_index_min_value, &[97_u8; 64]);
3270 assert_eq!(
3271 column_index_max_value.len(),
3272 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3273 );
3274
3275 assert_eq!(
3277 *column_index_max_value.last().unwrap(),
3278 *column_index_max_value.first().unwrap() + 1
3279 );
3280 } else {
3281 panic!("expecting Statistics::FixedLenByteArray");
3282 }
3283 } else {
3284 panic!("metadata missing statistics");
3285 }
3286 }
3287
3288 #[test]
3289 fn test_column_offset_index_truncating_spec_example() {
3290 let page_writer = get_test_page_writer();
3293
3294 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3296 let props = Arc::new(builder.build());
3297 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3298
3299 let mut data = vec![FixedLenByteArray::default(); 1];
3300 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3302
3303 writer.write_batch(&data, None, None).unwrap();
3304
3305 writer.flush_data_pages().unwrap();
3306
3307 let r = writer.close().unwrap();
3308 let column_index = r.column_index.unwrap();
3309 let offset_index = r.offset_index.unwrap();
3310
3311 let column_index = match column_index {
3312 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3313 _ => panic!("wrong stats type"),
3314 };
3315
3316 assert_eq!(1, r.rows_written);
3317
3318 assert_eq!(1, column_index.num_pages());
3320 assert_eq!(1, offset_index.page_locations.len());
3321 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3322 assert!(!column_index.is_null_page(0));
3323 assert_eq!(Some(0), column_index.null_count(0));
3324
3325 if let Some(stats) = r.metadata.statistics() {
3326 assert_eq!(stats.null_count_opt(), Some(0));
3327 assert_eq!(stats.distinct_count_opt(), None);
3328 if let Statistics::FixedLenByteArray(_stats) = stats {
3329 let column_index_min_value = column_index.min_value(0).unwrap();
3330 let column_index_max_value = column_index.max_value(0).unwrap();
3331
3332 assert_eq!(column_index_min_value.len(), 1);
3333 assert_eq!(column_index_max_value.len(), 1);
3334
3335 assert_eq!("B".as_bytes(), column_index_min_value);
3336 assert_eq!("C".as_bytes(), column_index_max_value);
3337
3338 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3339 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3340 } else {
3341 panic!("expecting Statistics::FixedLenByteArray");
3342 }
3343 } else {
3344 panic!("metadata missing statistics");
3345 }
3346 }
3347
3348 #[test]
3349 fn test_float16_min_max_no_truncation() {
3350 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3352 let props = Arc::new(builder.build());
3353 let page_writer = get_test_page_writer();
3354 let mut writer = get_test_float16_column_writer(page_writer, props);
3355
3356 let expected_value = f16::PI.to_le_bytes().to_vec();
3357 let data = vec![ByteArray::from(expected_value.clone()).into()];
3358 writer.write_batch(&data, None, None).unwrap();
3359 writer.flush_data_pages().unwrap();
3360
3361 let r = writer.close().unwrap();
3362
3363 let column_index = r.column_index.unwrap();
3366 let column_index = match column_index {
3367 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3368 _ => panic!("wrong stats type"),
3369 };
3370 let column_index_min_bytes = column_index.min_value(0).unwrap();
3371 let column_index_max_bytes = column_index.max_value(0).unwrap();
3372 assert_eq!(expected_value, column_index_min_bytes);
3373 assert_eq!(expected_value, column_index_max_bytes);
3374
3375 let stats = r.metadata.statistics().unwrap();
3377 if let Statistics::FixedLenByteArray(stats) = stats {
3378 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3379 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3380 assert_eq!(expected_value, stats_min_bytes);
3381 assert_eq!(expected_value, stats_max_bytes);
3382 } else {
3383 panic!("expecting Statistics::FixedLenByteArray");
3384 }
3385 }
3386
3387 #[test]
3388 fn test_decimal_min_max_no_truncation() {
3389 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3391 let props = Arc::new(builder.build());
3392 let page_writer = get_test_page_writer();
3393 let mut writer =
3394 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3395
3396 let expected_value = vec![
3397 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3398 231u8, 90u8, 0u8, 0u8,
3399 ];
3400 let data = vec![ByteArray::from(expected_value.clone()).into()];
3401 writer.write_batch(&data, None, None).unwrap();
3402 writer.flush_data_pages().unwrap();
3403
3404 let r = writer.close().unwrap();
3405
3406 let column_index = r.column_index.unwrap();
3409 let column_index = match column_index {
3410 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3411 _ => panic!("wrong stats type"),
3412 };
3413 let column_index_min_bytes = column_index.min_value(0).unwrap();
3414 let column_index_max_bytes = column_index.max_value(0).unwrap();
3415 assert_eq!(expected_value, column_index_min_bytes);
3416 assert_eq!(expected_value, column_index_max_bytes);
3417
3418 let stats = r.metadata.statistics().unwrap();
3420 if let Statistics::FixedLenByteArray(stats) = stats {
3421 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3422 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3423 assert_eq!(expected_value, stats_min_bytes);
3424 assert_eq!(expected_value, stats_max_bytes);
3425 } else {
3426 panic!("expecting Statistics::FixedLenByteArray");
3427 }
3428 }
3429
3430 #[test]
3431 fn test_statistics_truncating_byte_array_default() {
3432 let page_writer = get_test_page_writer();
3433
3434 let props = WriterProperties::builder().build().into();
3436 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3437
3438 let mut data = vec![ByteArray::default(); 1];
3439 data[0].set_data(Bytes::from(String::from(
3440 "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3441 )));
3442 writer.write_batch(&data, None, None).unwrap();
3443 writer.flush_data_pages().unwrap();
3444
3445 let r = writer.close().unwrap();
3446
3447 assert_eq!(1, r.rows_written);
3448
3449 let stats = r.metadata.statistics().expect("statistics");
3450 if let Statistics::ByteArray(_stats) = stats {
3451 let min_value = _stats.min_opt().unwrap();
3452 let max_value = _stats.max_opt().unwrap();
3453
3454 assert!(!_stats.min_is_exact());
3455 assert!(!_stats.max_is_exact());
3456
3457 let expected_len = 64;
3458 assert_eq!(min_value.len(), expected_len);
3459 assert_eq!(max_value.len(), expected_len);
3460
3461 let expected_min =
3462 "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3463 assert_eq!(expected_min, min_value.as_bytes());
3464 let expected_max =
3466 "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3467 assert_eq!(expected_max, max_value.as_bytes());
3468 } else {
3469 panic!("expecting Statistics::ByteArray");
3470 }
3471 }
3472
3473 #[test]
3474 fn test_statistics_truncating_byte_array() {
3475 let page_writer = get_test_page_writer();
3476
3477 const TEST_TRUNCATE_LENGTH: usize = 1;
3478
3479 let builder =
3481 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3482 let props = Arc::new(builder.build());
3483 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3484
3485 let mut data = vec![ByteArray::default(); 1];
3486 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3488
3489 writer.write_batch(&data, None, None).unwrap();
3490
3491 writer.flush_data_pages().unwrap();
3492
3493 let r = writer.close().unwrap();
3494
3495 assert_eq!(1, r.rows_written);
3496
3497 let stats = r.metadata.statistics().expect("statistics");
3498 assert_eq!(stats.null_count_opt(), Some(0));
3499 assert_eq!(stats.distinct_count_opt(), None);
3500 if let Statistics::ByteArray(_stats) = stats {
3501 let min_value = _stats.min_opt().unwrap();
3502 let max_value = _stats.max_opt().unwrap();
3503
3504 assert!(!_stats.min_is_exact());
3505 assert!(!_stats.max_is_exact());
3506
3507 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3508 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3509
3510 assert_eq!("B".as_bytes(), min_value.as_bytes());
3511 assert_eq!("C".as_bytes(), max_value.as_bytes());
3512 } else {
3513 panic!("expecting Statistics::ByteArray");
3514 }
3515 }
3516
3517 #[test]
3518 fn test_statistics_truncating_fixed_len_byte_array() {
3519 let page_writer = get_test_page_writer();
3520
3521 const TEST_TRUNCATE_LENGTH: usize = 1;
3522
3523 let builder =
3525 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3526 let props = Arc::new(builder.build());
3527 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3528
3529 let mut data = vec![FixedLenByteArray::default(); 1];
3530
3531 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3532 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3533
3534 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3536 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3537
3538 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3540
3541 writer.write_batch(&data, None, None).unwrap();
3542
3543 writer.flush_data_pages().unwrap();
3544
3545 let r = writer.close().unwrap();
3546
3547 assert_eq!(1, r.rows_written);
3548
3549 let stats = r.metadata.statistics().expect("statistics");
3550 assert_eq!(stats.null_count_opt(), Some(0));
3551 assert_eq!(stats.distinct_count_opt(), None);
3552 if let Statistics::FixedLenByteArray(_stats) = stats {
3553 let min_value = _stats.min_opt().unwrap();
3554 let max_value = _stats.max_opt().unwrap();
3555
3556 assert!(!_stats.min_is_exact());
3557 assert!(!_stats.max_is_exact());
3558
3559 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3560 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3561
3562 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3563 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3564
3565 let reconstructed_min = i128::from_be_bytes([
3566 min_value.as_bytes()[0],
3567 0,
3568 0,
3569 0,
3570 0,
3571 0,
3572 0,
3573 0,
3574 0,
3575 0,
3576 0,
3577 0,
3578 0,
3579 0,
3580 0,
3581 0,
3582 ]);
3583
3584 let reconstructed_max = i128::from_be_bytes([
3585 max_value.as_bytes()[0],
3586 0,
3587 0,
3588 0,
3589 0,
3590 0,
3591 0,
3592 0,
3593 0,
3594 0,
3595 0,
3596 0,
3597 0,
3598 0,
3599 0,
3600 0,
3601 ]);
3602
3603 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3605 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3606 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3607 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3608 } else {
3609 panic!("expecting Statistics::FixedLenByteArray");
3610 }
3611 }
3612
3613 #[test]
3614 fn test_send() {
3615 fn test<T: Send>() {}
3616 test::<ColumnWriterImpl<Int32Type>>();
3617 }
3618
3619 #[test]
3620 fn test_increment() {
3621 let v = increment(vec![0, 0, 0]).unwrap();
3622 assert_eq!(&v, &[0, 0, 1]);
3623
3624 let v = increment(vec![0, 255, 255]).unwrap();
3626 assert_eq!(&v, &[1, 0, 0]);
3627
3628 let v = increment(vec![255, 255, 255]);
3630 assert!(v.is_none());
3631 }
3632
3633 #[test]
3634 fn test_increment_utf8() {
3635 let test_inc = |o: &str, expected: &str| {
3636 if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3637 assert_eq!(v, expected);
3639 assert!(*v > *o);
3641 let mut greater = ByteArray::new();
3643 greater.set_data(Bytes::from(v));
3644 let mut original = ByteArray::new();
3645 original.set_data(Bytes::from(o.as_bytes().to_vec()));
3646 assert!(greater > original);
3647 } else {
3648 panic!("Expected incremented UTF8 string to also be valid.");
3649 }
3650 };
3651
3652 test_inc("hello", "hellp");
3654
3655 test_inc("a\u{7f}", "b");
3657
3658 assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3660
3661 test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3663
3664 test_inc("éééé", "éééê");
3666
3667 test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3669
3670 test_inc("a\u{7ff}", "b");
3672
3673 assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3675
3676 test_inc("ࠀࠀ", "ࠀࠁ");
3679
3680 test_inc("a\u{ffff}", "b");
3682
3683 assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3685
3686 test_inc("𐀀𐀀", "𐀀𐀁");
3688
3689 test_inc("a\u{10ffff}", "b");
3691
3692 assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3694
3695 test_inc("a\u{D7FF}", "b");
3698 }
3699
3700 #[test]
3701 fn test_truncate_utf8() {
3702 let data = "❤️🧡💛💚💙💜";
3704 let r = truncate_utf8(data, data.len()).unwrap();
3705 assert_eq!(r.len(), data.len());
3706 assert_eq!(&r, data.as_bytes());
3707
3708 let r = truncate_utf8(data, 13).unwrap();
3710 assert_eq!(r.len(), 10);
3711 assert_eq!(&r, "❤️🧡".as_bytes());
3712
3713 let r = truncate_utf8("\u{0836}", 1);
3715 assert!(r.is_none());
3716
3717 let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3720 assert_eq!(&r, "yyyyyyyz".as_bytes());
3721
3722 let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3724 assert_eq!(&r, "ééê".as_bytes());
3725
3726 let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3728 assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3729
3730 let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3732 assert!(r.is_none());
3733
3734 let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3737 assert_eq!(&r, "ࠀࠁ".as_bytes());
3738
3739 let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3741 assert!(r.is_none());
3742
3743 let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3745 assert_eq!(&r, "𐀀𐀁".as_bytes());
3746
3747 let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3749 assert!(r.is_none());
3750 }
3751
3752 #[test]
3753 fn test_byte_array_truncate_invalid_utf8_statistics() {
3756 let message_type = "
3757 message test_schema {
3758 OPTIONAL BYTE_ARRAY a (UTF8);
3759 }
3760 ";
3761 let schema = Arc::new(parse_message_type(message_type).unwrap());
3762
3763 let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3765 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3766 let file: File = tempfile::tempfile().unwrap();
3767 let props = Arc::new(
3768 WriterProperties::builder()
3769 .set_statistics_enabled(EnabledStatistics::Chunk)
3770 .set_statistics_truncate_length(Some(8))
3771 .build(),
3772 );
3773
3774 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3775 let mut row_group_writer = writer.next_row_group().unwrap();
3776
3777 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3778 col_writer
3779 .typed::<ByteArrayType>()
3780 .write_batch(&data, Some(&def_levels), None)
3781 .unwrap();
3782 col_writer.close().unwrap();
3783 row_group_writer.close().unwrap();
3784 let file_metadata = writer.close().unwrap();
3785 let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
3786 assert!(!stats.max_is_exact());
3787 assert_eq!(
3790 stats.max_bytes_opt().map(|v| v.to_vec()),
3791 Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3792 );
3793 }
3794
3795 #[test]
3796 fn test_increment_max_binary_chars() {
3797 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3798 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3799
3800 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3801 assert!(incremented.is_none())
3802 }
3803
3804 #[test]
3805 fn test_no_column_index_when_stats_disabled() {
3806 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3810 let props = Arc::new(
3811 WriterProperties::builder()
3812 .set_statistics_enabled(EnabledStatistics::None)
3813 .build(),
3814 );
3815 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3816 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3817
3818 let data = Vec::new();
3819 let def_levels = vec![0; 10];
3820 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3821 writer.flush_data_pages().unwrap();
3822
3823 let column_close_result = writer.close().unwrap();
3824 assert!(column_close_result.offset_index.is_some());
3825 assert!(column_close_result.column_index.is_none());
3826 }
3827
3828 #[test]
3829 fn test_no_offset_index_when_disabled() {
3830 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3832 let props = Arc::new(
3833 WriterProperties::builder()
3834 .set_statistics_enabled(EnabledStatistics::None)
3835 .set_offset_index_disabled(true)
3836 .build(),
3837 );
3838 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3839 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3840
3841 let data = Vec::new();
3842 let def_levels = vec![0; 10];
3843 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3844 writer.flush_data_pages().unwrap();
3845
3846 let column_close_result = writer.close().unwrap();
3847 assert!(column_close_result.offset_index.is_none());
3848 assert!(column_close_result.column_index.is_none());
3849 }
3850
3851 #[test]
3852 fn test_offset_index_overridden() {
3853 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3855 let props = Arc::new(
3856 WriterProperties::builder()
3857 .set_statistics_enabled(EnabledStatistics::Page)
3858 .set_offset_index_disabled(true)
3859 .build(),
3860 );
3861 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3862 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3863
3864 let data = Vec::new();
3865 let def_levels = vec![0; 10];
3866 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3867 writer.flush_data_pages().unwrap();
3868
3869 let column_close_result = writer.close().unwrap();
3870 assert!(column_close_result.offset_index.is_some());
3871 assert!(column_close_result.column_index.is_some());
3872 }
3873
3874 #[test]
3875 fn test_boundary_order() -> Result<()> {
3876 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3877 let column_close_result = write_multiple_pages::<Int32Type>(
3879 &descr,
3880 &[
3881 &[Some(-10), Some(10)],
3882 &[Some(-5), Some(11)],
3883 &[None],
3884 &[Some(-5), Some(11)],
3885 ],
3886 )?;
3887 let boundary_order = column_close_result
3888 .column_index
3889 .unwrap()
3890 .get_boundary_order();
3891 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3892
3893 let column_close_result = write_multiple_pages::<Int32Type>(
3895 &descr,
3896 &[
3897 &[Some(10), Some(11)],
3898 &[Some(5), Some(11)],
3899 &[None],
3900 &[Some(-5), Some(0)],
3901 ],
3902 )?;
3903 let boundary_order = column_close_result
3904 .column_index
3905 .unwrap()
3906 .get_boundary_order();
3907 assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3908
3909 let column_close_result = write_multiple_pages::<Int32Type>(
3911 &descr,
3912 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3913 )?;
3914 let boundary_order = column_close_result
3915 .column_index
3916 .unwrap()
3917 .get_boundary_order();
3918 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3919
3920 let column_close_result =
3922 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3923 let boundary_order = column_close_result
3924 .column_index
3925 .unwrap()
3926 .get_boundary_order();
3927 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3928
3929 let column_close_result =
3931 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3932 let boundary_order = column_close_result
3933 .column_index
3934 .unwrap()
3935 .get_boundary_order();
3936 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3937
3938 let column_close_result =
3940 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3941 let boundary_order = column_close_result
3942 .column_index
3943 .unwrap()
3944 .get_boundary_order();
3945 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3946
3947 let column_close_result = write_multiple_pages::<Int32Type>(
3949 &descr,
3950 &[
3951 &[Some(10), Some(11)],
3952 &[Some(11), Some(16)],
3953 &[None],
3954 &[Some(-5), Some(0)],
3955 ],
3956 )?;
3957 let boundary_order = column_close_result
3958 .column_index
3959 .unwrap()
3960 .get_boundary_order();
3961 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3962
3963 let column_close_result = write_multiple_pages::<Int32Type>(
3965 &descr,
3966 &[
3967 &[Some(1), Some(9)],
3968 &[Some(2), Some(8)],
3969 &[None],
3970 &[Some(3), Some(7)],
3971 ],
3972 )?;
3973 let boundary_order = column_close_result
3974 .column_index
3975 .unwrap()
3976 .get_boundary_order();
3977 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3978
3979 Ok(())
3980 }
3981
3982 #[test]
3983 fn test_boundary_order_logical_type() -> Result<()> {
3984 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3987 let fba_descr = {
3988 let tpe = SchemaType::primitive_type_builder(
3989 "col",
3990 FixedLenByteArrayType::get_physical_type(),
3991 )
3992 .with_length(2)
3993 .build()?;
3994 Arc::new(ColumnDescriptor::new(
3995 Arc::new(tpe),
3996 1,
3997 0,
3998 ColumnPath::from("col"),
3999 ))
4000 };
4001
4002 let values: &[&[Option<FixedLenByteArray>]] = &[
4003 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
4004 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
4005 &[Some(FixedLenByteArray::from(ByteArray::from(
4006 f16::NEG_ZERO,
4007 )))],
4008 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
4009 ];
4010
4011 let column_close_result =
4013 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
4014 let boundary_order = column_close_result
4015 .column_index
4016 .unwrap()
4017 .get_boundary_order();
4018 assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
4019
4020 let column_close_result =
4022 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
4023 let boundary_order = column_close_result
4024 .column_index
4025 .unwrap()
4026 .get_boundary_order();
4027 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
4028
4029 Ok(())
4030 }
4031
4032 #[test]
4033 fn test_interval_stats_should_not_have_min_max() {
4034 let input = [
4035 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
4036 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
4037 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
4038 ]
4039 .into_iter()
4040 .map(|s| ByteArray::from(s).into())
4041 .collect::<Vec<_>>();
4042
4043 let page_writer = get_test_page_writer();
4044 let mut writer = get_test_interval_column_writer(page_writer);
4045 writer.write_batch(&input, None, None).unwrap();
4046
4047 let metadata = writer.close().unwrap().metadata;
4048 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4049 stats.clone()
4050 } else {
4051 panic!("metadata missing statistics");
4052 };
4053 assert!(stats.min_bytes_opt().is_none());
4054 assert!(stats.max_bytes_opt().is_none());
4055 }
4056
4057 #[test]
4058 #[cfg(feature = "arrow")]
4059 fn test_column_writer_get_estimated_total_bytes() {
4060 let page_writer = get_test_page_writer();
4061 let props = Default::default();
4062 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
4063 assert_eq!(writer.get_estimated_total_bytes(), 0);
4064
4065 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
4066 writer.add_data_page().unwrap();
4067 let size_with_one_page = writer.get_estimated_total_bytes();
4068 assert_eq!(size_with_one_page, 20);
4069
4070 writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
4071 writer.add_data_page().unwrap();
4072 let size_with_two_pages = writer.get_estimated_total_bytes();
4073 assert_eq!(size_with_two_pages, 20 + 21);
4075 }
4076
4077 fn write_multiple_pages<T: DataType>(
4078 column_descr: &Arc<ColumnDescriptor>,
4079 pages: &[&[Option<T::T>]],
4080 ) -> Result<ColumnCloseResult> {
4081 let column_writer = get_column_writer(
4082 column_descr.clone(),
4083 Default::default(),
4084 get_test_page_writer(),
4085 );
4086 let mut writer = get_typed_column_writer::<T>(column_writer);
4087
4088 for &page in pages {
4089 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
4090 let def_levels = page
4091 .iter()
4092 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
4093 .collect::<Vec<_>>();
4094 writer.write_batch(&values, Some(&def_levels), None)?;
4095 writer.flush_data_pages()?;
4096 }
4097
4098 writer.close()
4099 }
4100
4101 fn column_roundtrip_random<T: DataType>(
4105 props: WriterProperties,
4106 max_size: usize,
4107 min_value: T::T,
4108 max_value: T::T,
4109 max_def_level: i16,
4110 max_rep_level: i16,
4111 ) where
4112 T::T: PartialOrd + SampleUniform + Copy,
4113 {
4114 let mut num_values: usize = 0;
4115
4116 let mut buf: Vec<i16> = Vec::new();
4117 let def_levels = if max_def_level > 0 {
4118 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
4119 for &dl in &buf[..] {
4120 if dl == max_def_level {
4121 num_values += 1;
4122 }
4123 }
4124 Some(&buf[..])
4125 } else {
4126 num_values = max_size;
4127 None
4128 };
4129
4130 let mut buf: Vec<i16> = Vec::new();
4131 let rep_levels = if max_rep_level > 0 {
4132 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
4133 buf[0] = 0; Some(&buf[..])
4135 } else {
4136 None
4137 };
4138
4139 let mut values: Vec<T::T> = Vec::new();
4140 random_numbers_range(num_values, min_value, max_value, &mut values);
4141
4142 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
4143 }
4144
4145 fn column_roundtrip<T: DataType>(
4147 props: WriterProperties,
4148 values: &[T::T],
4149 def_levels: Option<&[i16]>,
4150 rep_levels: Option<&[i16]>,
4151 ) {
4152 let mut file = tempfile::tempfile().unwrap();
4153 let mut write = TrackedWrite::new(&mut file);
4154 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4155
4156 let max_def_level = match def_levels {
4157 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4158 None => 0i16,
4159 };
4160
4161 let max_rep_level = match rep_levels {
4162 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4163 None => 0i16,
4164 };
4165
4166 let mut max_batch_size = values.len();
4167 if let Some(levels) = def_levels {
4168 max_batch_size = max_batch_size.max(levels.len());
4169 }
4170 if let Some(levels) = rep_levels {
4171 max_batch_size = max_batch_size.max(levels.len());
4172 }
4173
4174 let mut writer =
4175 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4176
4177 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
4178 assert_eq!(values_written, values.len());
4179 let result = writer.close().unwrap();
4180
4181 drop(write);
4182
4183 let props = ReaderProperties::builder()
4184 .set_backward_compatible_lz4(false)
4185 .build();
4186 let page_reader = Box::new(
4187 SerializedPageReader::new_with_properties(
4188 Arc::new(file),
4189 &result.metadata,
4190 result.rows_written as usize,
4191 None,
4192 Arc::new(props),
4193 )
4194 .unwrap(),
4195 );
4196 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
4197
4198 let mut actual_values = Vec::with_capacity(max_batch_size);
4199 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
4200 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
4201
4202 let (_, values_read, levels_read) = reader
4203 .read_records(
4204 max_batch_size,
4205 actual_def_levels.as_mut(),
4206 actual_rep_levels.as_mut(),
4207 &mut actual_values,
4208 )
4209 .unwrap();
4210
4211 assert_eq!(&actual_values[..values_read], values);
4214 match actual_def_levels {
4215 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
4216 None => assert_eq!(None, def_levels),
4217 }
4218 match actual_rep_levels {
4219 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
4220 None => assert_eq!(None, rep_levels),
4221 }
4222
4223 if let Some(levels) = actual_rep_levels {
4226 let mut actual_rows_written = 0;
4227 for l in levels {
4228 if l == 0 {
4229 actual_rows_written += 1;
4230 }
4231 }
4232 assert_eq!(actual_rows_written, result.rows_written);
4233 } else if actual_def_levels.is_some() {
4234 assert_eq!(levels_read as u64, result.rows_written);
4235 } else {
4236 assert_eq!(values_read as u64, result.rows_written);
4237 }
4238 }
4239
4240 fn column_write_and_get_metadata<T: DataType>(
4243 props: WriterProperties,
4244 values: &[T::T],
4245 ) -> ColumnChunkMetaData {
4246 let page_writer = get_test_page_writer();
4247 let props = Arc::new(props);
4248 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4249 writer.write_batch(values, None, None).unwrap();
4250 writer.close().unwrap().metadata
4251 }
4252
4253 fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4255 PageEncodingStats {
4256 page_type,
4257 encoding,
4258 count,
4259 }
4260 }
4261
4262 fn check_encoding_write_support<T: DataType>(
4266 version: WriterVersion,
4267 dict_enabled: bool,
4268 data: &[T::T],
4269 dictionary_page_offset: Option<i64>,
4270 encodings: &[Encoding],
4271 page_encoding_stats: &[PageEncodingStats],
4272 ) {
4273 let props = WriterProperties::builder()
4274 .set_writer_version(version)
4275 .set_dictionary_enabled(dict_enabled)
4276 .build();
4277 let meta = column_write_and_get_metadata::<T>(props, data);
4278 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4279 assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
4280 assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4281 }
4282
4283 fn get_test_column_writer<'a, T: DataType>(
4285 page_writer: Box<dyn PageWriter + 'a>,
4286 max_def_level: i16,
4287 max_rep_level: i16,
4288 props: WriterPropertiesPtr,
4289 ) -> ColumnWriterImpl<'a, T> {
4290 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4291 let column_writer = get_column_writer(descr, props, page_writer);
4292 get_typed_column_writer::<T>(column_writer)
4293 }
4294
4295 fn get_test_column_writer_with_path<'a, T: DataType>(
4296 page_writer: Box<dyn PageWriter + 'a>,
4297 max_def_level: i16,
4298 max_rep_level: i16,
4299 props: WriterPropertiesPtr,
4300 path: ColumnPath,
4301 ) -> ColumnWriterImpl<'a, T> {
4302 let descr = Arc::new(get_test_column_descr_with_path::<T>(
4303 max_def_level,
4304 max_rep_level,
4305 path,
4306 ));
4307 let column_writer = get_column_writer(descr, props, page_writer);
4308 get_typed_column_writer::<T>(column_writer)
4309 }
4310
4311 fn get_test_column_reader<T: DataType>(
4313 page_reader: Box<dyn PageReader>,
4314 max_def_level: i16,
4315 max_rep_level: i16,
4316 ) -> ColumnReaderImpl<T> {
4317 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4318 let column_reader = get_column_reader(descr, page_reader);
4319 get_typed_column_reader::<T>(column_reader)
4320 }
4321
4322 fn get_test_column_descr<T: DataType>(
4324 max_def_level: i16,
4325 max_rep_level: i16,
4326 ) -> ColumnDescriptor {
4327 let path = ColumnPath::from("col");
4328 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4329 .with_length(1)
4332 .build()
4333 .unwrap();
4334 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4335 }
4336
4337 fn get_test_column_descr_with_path<T: DataType>(
4338 max_def_level: i16,
4339 max_rep_level: i16,
4340 path: ColumnPath,
4341 ) -> ColumnDescriptor {
4342 let name = path.string();
4343 let tpe = SchemaType::primitive_type_builder(&name, T::get_physical_type())
4344 .with_length(1)
4347 .build()
4348 .unwrap();
4349 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4350 }
4351
4352 fn write_and_collect_page_values(
4353 path: ColumnPath,
4354 props: WriterPropertiesPtr,
4355 data: &[i32],
4356 ) -> Vec<u32> {
4357 let mut file = tempfile::tempfile().unwrap();
4358 let mut write = TrackedWrite::new(&mut file);
4359 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4360 let mut writer =
4361 get_test_column_writer_with_path::<Int32Type>(page_writer, 0, 0, props, path);
4362 writer.write_batch(data, None, None).unwrap();
4363 let r = writer.close().unwrap();
4364
4365 drop(write);
4366
4367 let props = ReaderProperties::builder()
4368 .set_backward_compatible_lz4(false)
4369 .build();
4370 let mut page_reader = Box::new(
4371 SerializedPageReader::new_with_properties(
4372 Arc::new(file),
4373 &r.metadata,
4374 r.rows_written as usize,
4375 None,
4376 Arc::new(props),
4377 )
4378 .unwrap(),
4379 );
4380
4381 let mut values_per_page = Vec::new();
4382 while let Some(page) = page_reader.get_next_page().unwrap() {
4383 assert_eq!(page.page_type(), PageType::DATA_PAGE);
4384 values_per_page.push(page.num_values());
4385 }
4386
4387 values_per_page
4388 }
4389
4390 fn get_test_page_writer() -> Box<dyn PageWriter> {
4392 Box::new(TestPageWriter {})
4393 }
4394
4395 struct TestPageWriter {}
4396
4397 impl PageWriter for TestPageWriter {
4398 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4399 let mut res = PageWriteSpec::new();
4400 res.page_type = page.page_type();
4401 res.uncompressed_size = page.uncompressed_size();
4402 res.compressed_size = page.compressed_size();
4403 res.num_values = page.num_values();
4404 res.offset = 0;
4405 res.bytes_written = page.data().len() as u64;
4406 Ok(res)
4407 }
4408
4409 fn close(&mut self) -> Result<()> {
4410 Ok(())
4411 }
4412 }
4413
4414 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4416 let page_writer = get_test_page_writer();
4417 let props = Default::default();
4418 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4419 writer.write_batch(values, None, None).unwrap();
4420
4421 let metadata = writer.close().unwrap().metadata;
4422 if let Some(stats) = metadata.statistics() {
4423 stats.clone()
4424 } else {
4425 panic!("metadata missing statistics");
4426 }
4427 }
4428
4429 fn get_test_decimals_column_writer<T: DataType>(
4431 page_writer: Box<dyn PageWriter>,
4432 max_def_level: i16,
4433 max_rep_level: i16,
4434 props: WriterPropertiesPtr,
4435 ) -> ColumnWriterImpl<'static, T> {
4436 let descr = Arc::new(get_test_decimals_column_descr::<T>(
4437 max_def_level,
4438 max_rep_level,
4439 ));
4440 let column_writer = get_column_writer(descr, props, page_writer);
4441 get_typed_column_writer::<T>(column_writer)
4442 }
4443
4444 fn get_test_decimals_column_descr<T: DataType>(
4446 max_def_level: i16,
4447 max_rep_level: i16,
4448 ) -> ColumnDescriptor {
4449 let path = ColumnPath::from("col");
4450 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4451 .with_length(16)
4452 .with_logical_type(Some(LogicalType::Decimal {
4453 scale: 2,
4454 precision: 3,
4455 }))
4456 .with_scale(2)
4457 .with_precision(3)
4458 .build()
4459 .unwrap();
4460 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4461 }
4462
4463 fn float16_statistics_roundtrip(
4464 values: &[FixedLenByteArray],
4465 ) -> ValueStatistics<FixedLenByteArray> {
4466 let page_writer = get_test_page_writer();
4467 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4468 writer.write_batch(values, None, None).unwrap();
4469
4470 let metadata = writer.close().unwrap().metadata;
4471 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4472 stats.clone()
4473 } else {
4474 panic!("metadata missing statistics");
4475 }
4476 }
4477
4478 fn get_test_float16_column_writer(
4479 page_writer: Box<dyn PageWriter>,
4480 props: WriterPropertiesPtr,
4481 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4482 let descr = Arc::new(get_test_float16_column_descr(0, 0));
4483 let column_writer = get_column_writer(descr, props, page_writer);
4484 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4485 }
4486
4487 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4488 let path = ColumnPath::from("col");
4489 let tpe =
4490 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4491 .with_length(2)
4492 .with_logical_type(Some(LogicalType::Float16))
4493 .build()
4494 .unwrap();
4495 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4496 }
4497
4498 fn get_test_interval_column_writer(
4499 page_writer: Box<dyn PageWriter>,
4500 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4501 let descr = Arc::new(get_test_interval_column_descr());
4502 let column_writer = get_column_writer(descr, Default::default(), page_writer);
4503 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4504 }
4505
4506 fn get_test_interval_column_descr() -> ColumnDescriptor {
4507 let path = ColumnPath::from("col");
4508 let tpe =
4509 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4510 .with_length(12)
4511 .with_converted_type(ConvertedType::INTERVAL)
4512 .build()
4513 .unwrap();
4514 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4515 }
4516
4517 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4519 page_writer: Box<dyn PageWriter + 'a>,
4520 max_def_level: i16,
4521 max_rep_level: i16,
4522 props: WriterPropertiesPtr,
4523 ) -> ColumnWriterImpl<'a, T> {
4524 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4525 max_def_level,
4526 max_rep_level,
4527 ));
4528 let column_writer = get_column_writer(descr, props, page_writer);
4529 get_typed_column_writer::<T>(column_writer)
4530 }
4531
4532 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4534 max_def_level: i16,
4535 max_rep_level: i16,
4536 ) -> ColumnDescriptor {
4537 let path = ColumnPath::from("col");
4538 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4539 .with_converted_type(ConvertedType::UINT_32)
4540 .build()
4541 .unwrap();
4542 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4543 }
4544
4545 #[test]
4546 fn test_page_v2_snappy_compression_fallback() {
4547 let page_writer = TestPageWriter {};
4549
4550 let props = WriterProperties::builder()
4552 .set_writer_version(WriterVersion::PARQUET_2_0)
4553 .set_dictionary_enabled(false)
4555 .set_compression(Compression::SNAPPY)
4556 .build();
4557
4558 let mut column_writer =
4559 get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
4560
4561 let values = vec![ByteArray::from("a")];
4564
4565 column_writer.write_batch(&values, None, None).unwrap();
4566
4567 let result = column_writer.close().unwrap();
4568 assert_eq!(
4569 result.metadata.uncompressed_size(),
4570 result.metadata.compressed_size()
4571 );
4572 }
4573
4574 struct ColumnRoundTripUniform<'a, T: DataType> {
4575 props: WriterProperties,
4576 values: &'a [T::T],
4577 def_levels: LevelDataRef<'a>,
4578 rep_levels: LevelDataRef<'a>,
4579 max_def_level: i16,
4580 max_rep_level: i16,
4581 expected_values: &'a [T::T],
4582 expected_def_levels: Option<&'a [i16]>,
4583 expected_rep_levels: Option<&'a [i16]>,
4584 }
4585
4586 impl<'a, T: DataType> ColumnRoundTripUniform<'a, T>
4587 where
4588 T::T: PartialEq + std::fmt::Debug,
4589 {
4590 fn new() -> Self {
4591 Self {
4592 props: Default::default(),
4593 values: &[],
4594 def_levels: LevelDataRef::Absent,
4595 rep_levels: LevelDataRef::Absent,
4596 max_def_level: 0,
4597 max_rep_level: 0,
4598 expected_values: &[],
4599 expected_def_levels: None,
4600 expected_rep_levels: None,
4601 }
4602 }
4603
4604 fn with_props(mut self, props: WriterProperties) -> Self {
4605 self.props = props;
4606 self
4607 }
4608
4609 fn with_values(mut self, values: &'a [T::T]) -> Self {
4610 self.values = values;
4611 self
4612 }
4613
4614 fn with_def_levels(mut self, def_levels: LevelDataRef<'a>) -> Self {
4615 self.def_levels = def_levels;
4616 self
4617 }
4618
4619 fn with_rep_levels(mut self, rep_levels: LevelDataRef<'a>) -> Self {
4620 self.rep_levels = rep_levels;
4621 self
4622 }
4623
4624 fn with_max_def_level(mut self, max_def_level: i16) -> Self {
4625 self.max_def_level = max_def_level;
4626 self
4627 }
4628
4629 fn with_max_rep_level(mut self, max_rep_level: i16) -> Self {
4630 self.max_rep_level = max_rep_level;
4631 self
4632 }
4633
4634 fn with_expected_values(mut self, expected_values: &'a [T::T]) -> Self {
4635 self.expected_values = expected_values;
4636 self
4637 }
4638
4639 fn with_expected_def_levels(mut self, expected_def_levels: &'a [i16]) -> Self {
4640 self.expected_def_levels = Some(expected_def_levels);
4641 self
4642 }
4643
4644 fn with_expected_rep_levels(mut self, expected_rep_levels: &'a [i16]) -> Self {
4645 self.expected_rep_levels = Some(expected_rep_levels);
4646 self
4647 }
4648
4649 fn run(self) {
4652 let mut file = tempfile::tempfile().unwrap();
4653 let mut write = TrackedWrite::new(&mut file);
4654 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4655 let mut writer = get_test_column_writer::<T>(
4656 page_writer,
4657 self.max_def_level,
4658 self.max_rep_level,
4659 Arc::new(self.props),
4660 );
4661
4662 writer
4663 .write_batch_internal(
4664 self.values,
4665 None,
4666 self.def_levels,
4667 self.rep_levels,
4668 None,
4669 None,
4670 None,
4671 )
4672 .unwrap();
4673 let result = writer.close().unwrap();
4674 drop(write);
4675
4676 let props = ReaderProperties::builder()
4677 .set_backward_compatible_lz4(false)
4678 .build();
4679 let page_reader = Box::new(
4680 SerializedPageReader::new_with_properties(
4681 Arc::new(file),
4682 &result.metadata,
4683 result.rows_written as usize,
4684 None,
4685 Arc::new(props),
4686 )
4687 .unwrap(),
4688 );
4689 let mut reader =
4690 get_test_column_reader::<T>(page_reader, self.max_def_level, self.max_rep_level);
4691
4692 let batch_size = self
4693 .expected_def_levels
4694 .map_or(self.expected_values.len(), |l| l.len());
4695 let mut actual_values = Vec::with_capacity(batch_size);
4696 let mut actual_def = self
4697 .expected_def_levels
4698 .map(|_| Vec::with_capacity(batch_size));
4699 let mut actual_rep = self
4700 .expected_rep_levels
4701 .map(|_| Vec::with_capacity(batch_size));
4702
4703 let (_, values_read, levels_read) = reader
4704 .read_records(
4705 batch_size,
4706 actual_def.as_mut(),
4707 actual_rep.as_mut(),
4708 &mut actual_values,
4709 )
4710 .unwrap();
4711
4712 assert_eq!(&actual_values[..values_read], self.expected_values);
4713 if let Some(ref v) = actual_def {
4714 assert_eq!(&v[..levels_read], self.expected_def_levels.unwrap());
4715 }
4716 if let Some(ref v) = actual_rep {
4717 assert_eq!(&v[..levels_read], self.expected_rep_levels.unwrap());
4718 }
4719 }
4720 }
4721
4722 #[test]
4723 fn test_uniform_def_levels_all_null() {
4724 let max_def_level = 1;
4726 let count = 100;
4727 let expected_def_levels = vec![0i16; count];
4728 ColumnRoundTripUniform::<Int32Type>::new()
4729 .with_def_levels(LevelDataRef::Uniform { value: 0, count })
4730 .with_max_def_level(max_def_level)
4731 .with_expected_def_levels(&expected_def_levels)
4732 .run();
4733 }
4734
4735 #[test]
4736 fn test_uniform_def_levels_all_valid() {
4737 let max_def_level = 1;
4739 let values: Vec<i32> = (0..50).collect();
4740 let expected_def_levels = vec![max_def_level; values.len()];
4741 ColumnRoundTripUniform::<Int32Type>::new()
4742 .with_values(&values)
4743 .with_def_levels(LevelDataRef::Uniform {
4744 value: max_def_level,
4745 count: values.len(),
4746 })
4747 .with_max_def_level(max_def_level)
4748 .with_expected_values(&values)
4749 .with_expected_def_levels(&expected_def_levels)
4750 .run();
4751 }
4752
4753 #[test]
4754 fn test_uniform_def_and_rep_levels() {
4755 let max_def_level = 2;
4758 let max_rep_level = 1;
4759 let count = 200;
4760 let expected_def_levels = vec![0i16; count];
4761 let expected_rep_levels = vec![0i16; count];
4762 ColumnRoundTripUniform::<Int32Type>::new()
4763 .with_def_levels(LevelDataRef::Uniform { value: 0, count })
4764 .with_rep_levels(LevelDataRef::Uniform { value: 0, count })
4765 .with_max_def_level(max_def_level)
4766 .with_max_rep_level(max_rep_level)
4767 .with_expected_def_levels(&expected_def_levels)
4768 .with_expected_rep_levels(&expected_rep_levels)
4769 .run();
4770 }
4771
4772 #[test]
4773 fn test_uniform_levels_v1_and_v2() {
4774 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
4776 let props = WriterProperties::builder()
4777 .set_writer_version(version)
4778 .build();
4779 let max_def = 1;
4780 let count = 100;
4781 let expected_def_levels = vec![0i16; count];
4782 ColumnRoundTripUniform::<Int32Type>::new()
4783 .with_props(props)
4784 .with_def_levels(LevelDataRef::Uniform { value: 0, count })
4785 .with_max_def_level(max_def)
4786 .with_expected_def_levels(&expected_def_levels)
4787 .run();
4788 }
4789 }
4790}