1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::file::page_index::column_index::ColumnIndexMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use std::collections::{BTreeSet, VecDeque};
27use std::str;
28
29use crate::basic::{
30 BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, IntType, LogicalType,
31 PageType, Type,
32};
33use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
34use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
35use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
36use crate::data_type::private::ParquetValueType;
37use crate::data_type::*;
38use crate::encodings::levels::LevelEncoder;
39#[cfg(feature = "encryption")]
40use crate::encryption::encrypt::get_column_crypto_metadata;
41use crate::errors::{ParquetError, Result};
42use crate::file::metadata::{
43 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
44 OffsetIndexBuilder, PageEncodingStats,
45};
46use crate::file::properties::{
47 EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
48};
49use crate::file::statistics::{Statistics, ValueStatistics};
50use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
51
52mod byte_budget_chunker;
53pub(crate) mod encoder;
54
55use byte_budget_chunker::ByteBudgetChunker;
56
57macro_rules! downcast_writer {
58 ($e:expr, $i:ident, $b:expr) => {
59 match $e {
60 Self::BoolColumnWriter($i) => $b,
61 Self::Int32ColumnWriter($i) => $b,
62 Self::Int64ColumnWriter($i) => $b,
63 Self::Int96ColumnWriter($i) => $b,
64 Self::FloatColumnWriter($i) => $b,
65 Self::DoubleColumnWriter($i) => $b,
66 Self::ByteArrayColumnWriter($i) => $b,
67 Self::FixedLenByteArrayColumnWriter($i) => $b,
68 }
69 };
70}
71
72pub enum ColumnWriter<'a> {
76 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
78 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
80 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
82 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
84 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
86 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
88 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
90 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
92}
93
94impl ColumnWriter<'_> {
95 #[cfg(feature = "arrow")]
97 pub(crate) fn memory_size(&self) -> usize {
98 downcast_writer!(self, typed, typed.memory_size())
99 }
100
101 #[cfg(feature = "arrow")]
103 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
104 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
105 }
106
107 #[cfg(feature = "arrow")]
112 pub(crate) fn add_data_page(&mut self) -> Result<()> {
113 downcast_writer!(self, typed, typed.add_data_page())
114 }
115
116 pub fn close(self) -> Result<ColumnCloseResult> {
118 downcast_writer!(self, typed, typed.close())
119 }
120}
121
122pub fn get_column_writer<'a>(
124 descr: ColumnDescPtr,
125 props: WriterPropertiesPtr,
126 page_writer: Box<dyn PageWriter + 'a>,
127) -> ColumnWriter<'a> {
128 match descr.physical_type() {
129 Type::BOOLEAN => {
130 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
131 }
132 Type::INT32 => {
133 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
134 }
135 Type::INT64 => {
136 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
137 }
138 Type::INT96 => {
139 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
140 }
141 Type::FLOAT => {
142 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
143 }
144 Type::DOUBLE => {
145 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
146 }
147 Type::BYTE_ARRAY => {
148 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
149 }
150 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
151 ColumnWriterImpl::new(descr, props, page_writer),
152 ),
153 }
154}
155
156pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
161 T::get_column_writer(col_writer).unwrap_or_else(|| {
162 panic!(
163 "Failed to convert column writer into a typed column writer for `{}` type",
164 T::get_physical_type()
165 )
166 })
167}
168
169pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
171 col_writer: &'b ColumnWriter<'a>,
172) -> &'b ColumnWriterImpl<'a, T> {
173 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
174 panic!(
175 "Failed to convert column writer into a typed column writer for `{}` type",
176 T::get_physical_type()
177 )
178 })
179}
180
181pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
183 col_writer: &'a mut ColumnWriter<'b>,
184) -> &'a mut ColumnWriterImpl<'b, T> {
185 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
186 panic!(
187 "Failed to convert column writer into a typed column writer for `{}` type",
188 T::get_physical_type()
189 )
190 })
191}
192
193#[derive(Debug, Clone)]
197pub struct ColumnCloseResult {
198 pub bytes_written: u64,
200 pub rows_written: u64,
202 pub metadata: ColumnChunkMetaData,
204 pub bloom_filter: Option<Sbbf>,
206 pub column_index: Option<ColumnIndexMetaData>,
208 pub offset_index: Option<OffsetIndexMetaData>,
210}
211
212impl ColumnCloseResult {
213 pub fn update_dictionary_location(mut self, dictionary_len: usize) -> Result<Self> {
224 if dictionary_len > 0 {
225 self.metadata = self
226 .metadata
227 .into_builder()
228 .set_dictionary_page_offset(Some(0))
229 .set_data_page_offset(dictionary_len as i64)
230 .build()?;
231 if let Some(offset_index) = self.offset_index.as_mut() {
232 let mut offset = dictionary_len as i64;
233 for location in offset_index.page_locations.iter_mut() {
234 location.offset = offset;
235 offset += location.compressed_page_size as i64;
236 }
237 }
238 }
239 Ok(self)
240 }
241}
242
243#[derive(Default)]
245struct PageMetrics {
246 num_buffered_values: u32,
247 num_buffered_rows: u32,
248 num_page_nulls: u64,
249 repetition_level_histogram: Option<LevelHistogram>,
250 definition_level_histogram: Option<LevelHistogram>,
251}
252
253impl PageMetrics {
254 fn new() -> Self {
255 Default::default()
256 }
257
258 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
260 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
261 self
262 }
263
264 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
266 self.definition_level_histogram = LevelHistogram::try_new(max_level);
267 self
268 }
269
270 fn new_page(&mut self) {
273 self.num_buffered_values = 0;
274 self.num_buffered_rows = 0;
275 self.num_page_nulls = 0;
276 self.repetition_level_histogram
277 .as_mut()
278 .map(LevelHistogram::reset);
279 self.definition_level_histogram
280 .as_mut()
281 .map(LevelHistogram::reset);
282 }
283}
284
285#[derive(Default)]
287struct ColumnMetrics<T: Default> {
288 total_bytes_written: u64,
289 total_rows_written: u64,
290 total_uncompressed_size: u64,
291 total_compressed_size: u64,
292 total_num_values: u64,
293 dictionary_page_offset: Option<u64>,
294 data_page_offset: Option<u64>,
295 min_column_value: Option<T>,
296 max_column_value: Option<T>,
297 num_column_nulls: u64,
298 column_distinct_count: Option<u64>,
299 variable_length_bytes: Option<i64>,
300 repetition_level_histogram: Option<LevelHistogram>,
301 definition_level_histogram: Option<LevelHistogram>,
302}
303
304impl<T: Default> ColumnMetrics<T> {
305 fn new() -> Self {
306 Default::default()
307 }
308
309 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
311 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
312 self
313 }
314
315 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
317 self.definition_level_histogram = LevelHistogram::try_new(max_level);
318 self
319 }
320
321 fn update_histogram(
323 chunk_histogram: &mut Option<LevelHistogram>,
324 page_histogram: &Option<LevelHistogram>,
325 ) {
326 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
327 chunk_hist.add(page_hist);
328 }
329 }
330
331 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
334 ColumnMetrics::<T>::update_histogram(
335 &mut self.definition_level_histogram,
336 &page_metrics.definition_level_histogram,
337 );
338 ColumnMetrics::<T>::update_histogram(
339 &mut self.repetition_level_histogram,
340 &page_metrics.repetition_level_histogram,
341 );
342 }
343
344 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
346 if let Some(var_bytes) = variable_length_bytes {
347 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
348 }
349 }
350}
351
352#[derive(Debug, Clone, Copy)]
360pub(crate) enum LevelDataRef<'a> {
361 Absent,
362 Materialized(&'a [i16]),
363 Uniform { value: i16, count: usize },
364}
365
366impl<'a> From<&'a [i16]> for LevelDataRef<'a> {
367 fn from(levels: &'a [i16]) -> Self {
368 Self::Materialized(levels)
369 }
370}
371
372impl<'a> From<Option<&'a [i16]>> for LevelDataRef<'a> {
373 fn from(levels: Option<&'a [i16]>) -> Self {
374 levels.map_or(Self::Absent, Self::from)
375 }
376}
377
378impl<'a> LevelDataRef<'a> {
379 pub(crate) fn len(self) -> usize {
380 match self {
381 Self::Absent => 0,
382 Self::Materialized(values) => values.len(),
383 Self::Uniform { count, .. } => count,
384 }
385 }
386
387 pub(crate) fn first(self) -> Option<i16> {
388 match self {
389 Self::Absent => None,
390 Self::Materialized(values) => values.first().copied(),
391 Self::Uniform { value, count } => (count > 0).then_some(value),
392 }
393 }
394
395 #[cfg(feature = "arrow")]
396 pub(crate) fn value_at(self, idx: usize) -> Option<i16> {
397 match self {
398 Self::Absent => None,
399 Self::Materialized(values) => values.get(idx).copied(),
400 Self::Uniform { value, count } => (idx < count).then_some(value),
401 }
402 }
403
404 pub(crate) fn slice(self, offset: usize, len: usize) -> Self {
405 match self {
406 Self::Absent => Self::Absent,
407 Self::Materialized(values) => Self::Materialized(&values[offset..offset + len]),
408 Self::Uniform { value, .. } => Self::Uniform { value, count: len },
409 }
410 }
411
412 pub(crate) fn value_count(self, total: usize, max_def: i16) -> usize {
417 match self {
418 Self::Absent => total,
419 Self::Materialized(values) => values.iter().filter(|&&d| d == max_def).count(),
420 Self::Uniform { value, count } => {
421 if value == max_def {
422 count
423 } else {
424 0
425 }
426 }
427 }
428 }
429}
430
431pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
433
434pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
436 descr: ColumnDescPtr,
438 props: WriterPropertiesPtr,
439 statistics_enabled: EnabledStatistics,
440
441 page_writer: Box<dyn PageWriter + 'a>,
442 codec: Compression,
443 compressor: Option<Box<dyn Codec>>,
444 encoder: E,
445
446 page_metrics: PageMetrics,
447 column_metrics: ColumnMetrics<E::T>,
449
450 encodings: BTreeSet<Encoding>,
453 encoding_stats: Vec<PageEncodingStats>,
454 def_levels_encoder: LevelEncoder,
456 rep_levels_encoder: LevelEncoder,
457 data_pages: VecDeque<CompressedPage>,
458 column_index_builder: ColumnIndexBuilder,
460 offset_index_builder: Option<OffsetIndexBuilder>,
461
462 data_page_boundary_ascending: bool,
465 data_page_boundary_descending: bool,
466 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
468}
469
470impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
471 pub fn new(
473 descr: ColumnDescPtr,
474 props: WriterPropertiesPtr,
475 page_writer: Box<dyn PageWriter + 'a>,
476 ) -> Self {
477 let codec = props.compression(descr.path());
478 let codec_options = CodecOptionsBuilder::default().build();
479 let compressor = create_codec(codec, &codec_options).unwrap();
480 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
481
482 let statistics_enabled = props.statistics_enabled(descr.path());
483
484 let mut encodings = BTreeSet::new();
485 encodings.insert(Encoding::RLE);
487
488 let mut page_metrics = PageMetrics::new();
489 let mut column_metrics = ColumnMetrics::<E::T>::new();
490
491 if statistics_enabled != EnabledStatistics::None {
493 page_metrics = page_metrics
494 .with_repetition_level_histogram(descr.max_rep_level())
495 .with_definition_level_histogram(descr.max_def_level());
496 column_metrics = column_metrics
497 .with_repetition_level_histogram(descr.max_rep_level())
498 .with_definition_level_histogram(descr.max_def_level())
499 }
500
501 let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type());
503 if statistics_enabled != EnabledStatistics::Page {
504 column_index_builder.to_invalid()
505 }
506
507 let offset_index_builder = match props.offset_index_disabled() {
509 false => Some(OffsetIndexBuilder::new()),
510 _ => None,
511 };
512
513 Self {
514 def_levels_encoder: Self::create_level_encoder(descr.max_def_level(), &props),
515 rep_levels_encoder: Self::create_level_encoder(descr.max_rep_level(), &props),
516 descr,
517 props,
518 statistics_enabled,
519 page_writer,
520 codec,
521 compressor,
522 encoder,
523 data_pages: VecDeque::new(),
524 page_metrics,
525 column_metrics,
526 column_index_builder,
527 offset_index_builder,
528 encodings,
529 encoding_stats: vec![],
530 data_page_boundary_ascending: true,
531 data_page_boundary_descending: true,
532 last_non_null_data_page_min_max: None,
533 }
534 }
535
536 #[allow(clippy::too_many_arguments)]
537 pub(crate) fn write_batch_internal(
538 &mut self,
539 values: &E::Values,
540 value_indices: Option<&[usize]>,
541 def_levels: LevelDataRef<'_>,
542 rep_levels: LevelDataRef<'_>,
543 min: Option<&E::T>,
544 max: Option<&E::T>,
545 distinct_count: Option<u64>,
546 ) -> Result<usize> {
547 if def_levels.len() != 0 && rep_levels.len() != 0 && def_levels.len() != rep_levels.len() {
549 return Err(general_err!(
550 "Inconsistent length of definition and repetition levels: {} != {}",
551 def_levels.len(),
552 rep_levels.len()
553 ));
554 }
555
556 let num_levels = def_levels.len().max(rep_levels.len());
566 let num_levels = if num_levels > 0 {
567 num_levels
568 } else {
569 value_indices.map_or(values.len(), |i| i.len())
570 };
571
572 if let Some(min) = min {
573 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
574 }
575 if let Some(max) = max {
576 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
577 }
578
579 if self.encoder.num_values() == 0 {
581 self.column_metrics.column_distinct_count = distinct_count;
582 } else {
583 self.column_metrics.column_distinct_count = None;
584 }
585
586 let mut values_offset = 0;
587 let mut levels_offset = 0;
588 let both_levels_compact = !matches!(def_levels, LevelDataRef::Materialized(_))
589 && !matches!(rep_levels, LevelDataRef::Materialized(_));
590 let has_levels = !matches!(def_levels, LevelDataRef::Absent)
591 || !matches!(rep_levels, LevelDataRef::Absent);
592 let base_batch_size = if both_levels_compact && has_levels {
596 self.props.data_page_row_count_limit()
597 } else {
598 self.props.write_batch_size()
599 };
600 let chunker = ByteBudgetChunker::new(&self.descr, &self.props, base_batch_size);
601 while levels_offset < num_levels {
602 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
603
604 if let LevelDataRef::Materialized(levels) = rep_levels {
606 while end_offset < levels.len() && levels[end_offset] != 0 {
607 end_offset += 1;
608 }
609 }
610
611 let chunk_size = end_offset - levels_offset;
612 let chunk_def = def_levels.slice(levels_offset, chunk_size);
613 let chunk_rep = rep_levels.slice(levels_offset, chunk_size);
614
615 let sub_batch_size = chunker.pick_sub_batch_size(
622 &self.encoder,
623 values,
624 value_indices,
625 chunk_def,
626 values_offset,
627 chunk_size,
628 );
629
630 if sub_batch_size >= chunk_size {
631 values_offset += self.write_mini_batch(
632 values,
633 values_offset,
634 value_indices,
635 chunk_size,
636 chunk_def,
637 chunk_rep,
638 )?;
639 } else {
640 values_offset += self.write_granular_chunk(
641 values,
642 values_offset,
643 value_indices,
644 chunk_size,
645 chunk_def,
646 chunk_rep,
647 sub_batch_size,
648 )?;
649 }
650 levels_offset = end_offset;
651 }
652
653 Ok(values_offset)
655 }
656
657 pub fn write_batch(
670 &mut self,
671 values: &E::Values,
672 def_levels: Option<&[i16]>,
673 rep_levels: Option<&[i16]>,
674 ) -> Result<usize> {
675 self.write_batch_internal(
676 values,
677 None,
678 LevelDataRef::from(def_levels),
679 LevelDataRef::from(rep_levels),
680 None,
681 None,
682 None,
683 )
684 }
685
686 pub fn write_batch_with_statistics(
694 &mut self,
695 values: &E::Values,
696 def_levels: Option<&[i16]>,
697 rep_levels: Option<&[i16]>,
698 min: Option<&E::T>,
699 max: Option<&E::T>,
700 distinct_count: Option<u64>,
701 ) -> Result<usize> {
702 self.write_batch_internal(
703 values,
704 None,
705 LevelDataRef::from(def_levels),
706 LevelDataRef::from(rep_levels),
707 min,
708 max,
709 distinct_count,
710 )
711 }
712
713 #[cfg(feature = "arrow")]
718 pub(crate) fn memory_size(&self) -> usize {
719 self.encoder.estimated_memory_size()
725 + self
726 .data_pages
727 .iter()
728 .map(|page| page.memory_usage())
729 .sum::<usize>()
730 + self.page_writer.buffered_memory_size()
731 }
732
733 pub fn get_total_bytes_written(&self) -> u64 {
739 self.column_metrics.total_bytes_written
740 }
741
742 #[cfg(feature = "arrow")]
748 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
749 self.data_pages
750 .iter()
751 .map(|page| page.data().len() as u64)
752 .sum::<u64>()
753 + self.column_metrics.total_bytes_written
754 + self.encoder.estimated_data_page_size() as u64
755 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
756 }
757
758 pub fn get_total_rows_written(&self) -> u64 {
761 self.column_metrics.total_rows_written
762 }
763
764 pub fn get_descriptor(&self) -> &ColumnDescPtr {
766 &self.descr
767 }
768
769 pub fn close(mut self) -> Result<ColumnCloseResult> {
772 if self.page_metrics.num_buffered_values > 0 {
773 self.add_data_page()?;
774 }
775 if self.encoder.has_dictionary() {
776 self.write_dictionary_page()?;
777 }
778 self.flush_data_pages()?;
779 let metadata = self.build_column_metadata()?;
780 self.page_writer.close()?;
781
782 let boundary_order = match (
783 self.data_page_boundary_ascending,
784 self.data_page_boundary_descending,
785 ) {
786 (true, _) => BoundaryOrder::ASCENDING,
789 (false, true) => BoundaryOrder::DESCENDING,
790 (false, false) => BoundaryOrder::UNORDERED,
791 };
792 self.column_index_builder.set_boundary_order(boundary_order);
793
794 let column_index = match self.column_index_builder.valid() {
795 true => Some(self.column_index_builder.build()?),
796 false => None,
797 };
798
799 let offset_index = self.offset_index_builder.map(|b| b.build());
800
801 Ok(ColumnCloseResult {
802 bytes_written: self.column_metrics.total_bytes_written,
803 rows_written: self.column_metrics.total_rows_written,
804 bloom_filter: self.encoder.flush_bloom_filter(),
805 metadata,
806 column_index,
807 offset_index,
808 })
809 }
810
811 #[allow(clippy::too_many_arguments)]
825 #[inline(never)]
826 fn write_granular_chunk(
827 &mut self,
828 values: &E::Values,
829 values_offset: usize,
830 value_indices: Option<&[usize]>,
831 chunk_size: usize,
832 chunk_def: LevelDataRef<'_>,
833 chunk_rep: LevelDataRef<'_>,
834 sub_batch_size: usize,
835 ) -> Result<usize> {
836 debug_assert!(sub_batch_size >= 1, "chunker must size at least one level");
839 let mut values_consumed = 0;
840 let mut sub_start = 0;
841 while sub_start < chunk_size {
842 let sub_end = match chunk_rep {
843 LevelDataRef::Materialized(levels) => {
844 let mut e = (sub_start + sub_batch_size).min(chunk_size);
852 while e < chunk_size && levels[e] != 0 {
853 e += 1;
854 }
855 e
856 }
857 _ => (sub_start + sub_batch_size).min(chunk_size),
858 };
859 let sub_len = sub_end - sub_start;
860 let written = self.write_mini_batch(
861 values,
862 values_offset + values_consumed,
863 value_indices,
864 sub_len,
865 chunk_def.slice(sub_start, sub_len),
866 chunk_rep.slice(sub_start, sub_len),
867 )?;
868 values_consumed += written;
869 sub_start = sub_end;
870 }
871 Ok(values_consumed)
872 }
873
874 fn create_level_encoder(max_level: i16, props: &WriterProperties) -> LevelEncoder {
876 match props.writer_version() {
877 WriterVersion::PARQUET_1_0 => LevelEncoder::v1_streaming(max_level),
878 WriterVersion::PARQUET_2_0 => LevelEncoder::v2_streaming(max_level),
879 }
880 }
881
882 fn write_mini_batch(
886 &mut self,
887 values: &E::Values,
888 values_offset: usize,
889 value_indices: Option<&[usize]>,
890 num_levels: usize,
891 def_levels: LevelDataRef<'_>,
892 rep_levels: LevelDataRef<'_>,
893 ) -> Result<usize> {
894 let values_to_write = if self.descr.max_def_level() > 0 {
896 let max_def = self.descr.max_def_level();
897 match def_levels {
898 LevelDataRef::Absent => {
899 return Err(general_err!(
900 "Definition levels are required, because max definition level = {}",
901 self.descr.max_def_level()
902 ));
903 }
904 LevelDataRef::Materialized(levels) => {
905 let mut values_to_write = 0usize;
908 let encoder = &mut self.def_levels_encoder;
909 match self.page_metrics.definition_level_histogram.as_mut() {
910 Some(histogram) => encoder.put_with_observer(levels, |level, count| {
911 values_to_write += count * (level == max_def) as usize;
912 histogram.increment_by(level, count as i64);
913 }),
914 None => encoder.put_with_observer(levels, |level, count| {
915 values_to_write += count * (level == max_def) as usize;
916 }),
917 };
918 self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
919 values_to_write
920 }
921 LevelDataRef::Uniform { value, count } => {
922 let encoder = &mut self.def_levels_encoder;
925 match self.page_metrics.definition_level_histogram.as_mut() {
926 Some(histogram) => {
927 encoder.put_n_with_observer(value, count, |level, run_len| {
928 histogram.increment_by(level, run_len as i64);
929 })
930 }
931 None => encoder.put_n_with_observer(value, count, |_, _| {}),
932 };
933 let values_to_write = count * (value == max_def) as usize;
934 self.page_metrics.num_page_nulls += (count - values_to_write) as u64;
935 values_to_write
936 }
937 }
938 } else {
939 num_levels
940 };
941
942 if self.descr.max_rep_level() > 0 {
944 let first_level = rep_levels.first().ok_or_else(|| {
946 general_err!(
947 "Repetition levels are required, because max repetition level = {}",
948 self.descr.max_rep_level()
949 )
950 })?;
951
952 if first_level != 0 {
953 return Err(general_err!(
954 "Write must start at a record boundary, got non-zero repetition level of {}",
955 first_level
956 ));
957 }
958
959 let mut new_rows = 0u32;
960 match rep_levels {
961 LevelDataRef::Absent => unreachable!(),
962 LevelDataRef::Materialized(levels) => {
963 let encoder = &mut self.rep_levels_encoder;
964 match self.page_metrics.repetition_level_histogram.as_mut() {
965 Some(histogram) => encoder.put_with_observer(levels, |level, count| {
966 new_rows += (count as u32) * (level == 0) as u32;
967 histogram.increment_by(level, count as i64);
968 }),
969 None => encoder.put_with_observer(levels, |level, count| {
970 new_rows += (count as u32) * (level == 0) as u32;
971 }),
972 };
973 }
974 LevelDataRef::Uniform { value, count } => {
975 let encoder = &mut self.rep_levels_encoder;
976 match self.page_metrics.repetition_level_histogram.as_mut() {
977 Some(histogram) => {
978 encoder.put_n_with_observer(value, count, |level, run_len| {
979 new_rows += (run_len as u32) * (level == 0) as u32;
980 histogram.increment_by(level, run_len as i64);
981 })
982 }
983 None => encoder.put_n_with_observer(value, count, |level, run_len| {
984 new_rows += (run_len as u32) * (level == 0) as u32;
985 }),
986 };
987 }
988 }
989 self.page_metrics.num_buffered_rows += new_rows;
990 } else {
991 self.page_metrics.num_buffered_rows += num_levels as u32;
994 }
995
996 match value_indices {
997 Some(indices) => {
998 let indices = &indices[values_offset..values_offset + values_to_write];
999 self.encoder.write_gather(values, indices)?;
1000 }
1001 None => self.encoder.write(values, values_offset, values_to_write)?,
1002 }
1003
1004 self.page_metrics.num_buffered_values += num_levels as u32;
1005
1006 if self.should_add_data_page() {
1007 self.add_data_page()?;
1008 }
1009
1010 if self.should_dict_fallback() {
1011 self.dict_fallback()?;
1012 }
1013
1014 Ok(values_to_write)
1015 }
1016
1017 #[inline]
1022 fn should_dict_fallback(&self) -> bool {
1023 match self.encoder.estimated_dict_page_size() {
1024 Some(size) => {
1025 size >= self
1026 .props
1027 .column_dictionary_page_size_limit(self.descr.path())
1028 }
1029 None => false,
1030 }
1031 }
1032
1033 #[inline]
1035 fn should_add_data_page(&self) -> bool {
1036 if self.page_metrics.num_buffered_values == 0 {
1041 return false;
1042 }
1043
1044 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
1045 || self.encoder.estimated_data_page_size()
1046 >= self.props.column_data_page_size_limit(self.descr.path())
1047 }
1048
1049 fn dict_fallback(&mut self) -> Result<()> {
1052 if self.page_metrics.num_buffered_values > 0 {
1054 self.add_data_page()?;
1055 }
1056 self.write_dictionary_page()?;
1057 self.flush_data_pages()?;
1058 Ok(())
1059 }
1060
1061 fn update_column_offset_index(
1063 &mut self,
1064 page_statistics: Option<&ValueStatistics<E::T>>,
1065 page_variable_length_bytes: Option<i64>,
1066 ) {
1067 let null_page =
1069 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
1070 if null_page && self.column_index_builder.valid() {
1073 self.column_index_builder.append(
1074 null_page,
1075 vec![],
1076 vec![],
1077 self.page_metrics.num_page_nulls as i64,
1078 );
1079 } else if self.column_index_builder.valid() {
1080 match &page_statistics {
1083 None => {
1084 self.column_index_builder.to_invalid();
1085 }
1086 Some(stat) => {
1087 let new_min = stat.min_opt().unwrap();
1089 let new_max = stat.max_opt().unwrap();
1090 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
1091 if self.data_page_boundary_ascending {
1092 let not_ascending = compare_greater(&self.descr, last_min, new_min)
1094 || compare_greater(&self.descr, last_max, new_max);
1095 if not_ascending {
1096 self.data_page_boundary_ascending = false;
1097 }
1098 }
1099
1100 if self.data_page_boundary_descending {
1101 let not_descending = compare_greater(&self.descr, new_min, last_min)
1103 || compare_greater(&self.descr, new_max, last_max);
1104 if not_descending {
1105 self.data_page_boundary_descending = false;
1106 }
1107 }
1108 }
1109 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
1110
1111 if self.can_truncate_value() {
1112 self.column_index_builder.append(
1113 null_page,
1114 self.truncate_min_value(
1115 self.props.column_index_truncate_length(),
1116 stat.min_bytes_opt().unwrap(),
1117 )
1118 .0,
1119 self.truncate_max_value(
1120 self.props.column_index_truncate_length(),
1121 stat.max_bytes_opt().unwrap(),
1122 )
1123 .0,
1124 self.page_metrics.num_page_nulls as i64,
1125 );
1126 } else {
1127 self.column_index_builder.append(
1128 null_page,
1129 stat.min_bytes_opt().unwrap().to_vec(),
1130 stat.max_bytes_opt().unwrap().to_vec(),
1131 self.page_metrics.num_page_nulls as i64,
1132 );
1133 }
1134 }
1135 }
1136 }
1137
1138 self.column_index_builder.append_histograms(
1140 &self.page_metrics.repetition_level_histogram,
1141 &self.page_metrics.definition_level_histogram,
1142 );
1143
1144 if let Some(builder) = self.offset_index_builder.as_mut() {
1146 builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
1147 builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
1148 }
1149 }
1150
1151 fn can_truncate_value(&self) -> bool {
1153 match self.descr.physical_type() {
1154 Type::FIXED_LEN_BYTE_ARRAY
1158 if !matches!(
1159 self.descr.logical_type_ref(),
1160 Some(&LogicalType::Decimal { .. }) | Some(&LogicalType::Float16)
1161 ) =>
1162 {
1163 true
1164 }
1165 Type::BYTE_ARRAY => true,
1166 _ => false,
1168 }
1169 }
1170
1171 fn is_utf8(&self) -> bool {
1173 self.get_descriptor().logical_type_ref() == Some(&LogicalType::String)
1174 || self.get_descriptor().converted_type() == ConvertedType::UTF8
1175 }
1176
1177 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
1188 truncation_length
1189 .filter(|l| data.len() > *l)
1190 .and_then(|l|
1191 if self.is_utf8() {
1193 match str::from_utf8(data) {
1194 Ok(str_data) => truncate_utf8(str_data, l),
1195 Err(_) => Some(data[..l].to_vec()),
1196 }
1197 } else {
1198 Some(data[..l].to_vec())
1199 }
1200 )
1201 .map(|truncated| (truncated, true))
1202 .unwrap_or_else(|| (data.to_vec(), false))
1203 }
1204
1205 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
1219 truncation_length
1220 .filter(|l| data.len() > *l)
1221 .and_then(|l|
1222 if self.is_utf8() {
1224 match str::from_utf8(data) {
1225 Ok(str_data) => truncate_and_increment_utf8(str_data, l),
1226 Err(_) => increment(data[..l].to_vec()),
1227 }
1228 } else {
1229 increment(data[..l].to_vec())
1230 }
1231 )
1232 .map(|truncated| (truncated, true))
1233 .unwrap_or_else(|| (data.to_vec(), false))
1234 }
1235
1236 fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
1239 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1240 match statistics {
1241 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
1242 let (min, did_truncate_min) = self.truncate_min_value(
1243 self.props.statistics_truncate_length(),
1244 stats.min_bytes_opt().unwrap(),
1245 );
1246 let (max, did_truncate_max) = self.truncate_max_value(
1247 self.props.statistics_truncate_length(),
1248 stats.max_bytes_opt().unwrap(),
1249 );
1250 Statistics::ByteArray(
1251 ValueStatistics::new(
1252 Some(min.into()),
1253 Some(max.into()),
1254 stats.distinct_count(),
1255 stats.null_count_opt(),
1256 backwards_compatible_min_max,
1257 )
1258 .with_max_is_exact(!did_truncate_max)
1259 .with_min_is_exact(!did_truncate_min),
1260 )
1261 }
1262 Statistics::FixedLenByteArray(stats)
1263 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
1264 {
1265 let (min, did_truncate_min) = self.truncate_min_value(
1266 self.props.statistics_truncate_length(),
1267 stats.min_bytes_opt().unwrap(),
1268 );
1269 let (max, did_truncate_max) = self.truncate_max_value(
1270 self.props.statistics_truncate_length(),
1271 stats.max_bytes_opt().unwrap(),
1272 );
1273 Statistics::FixedLenByteArray(
1274 ValueStatistics::new(
1275 Some(min.into()),
1276 Some(max.into()),
1277 stats.distinct_count(),
1278 stats.null_count_opt(),
1279 backwards_compatible_min_max,
1280 )
1281 .with_max_is_exact(!did_truncate_max)
1282 .with_min_is_exact(!did_truncate_min),
1283 )
1284 }
1285 stats => stats,
1286 }
1287 }
1288
1289 pub(crate) fn add_data_page(&mut self) -> Result<()> {
1292 let values_data = self.encoder.flush_data_page()?;
1294
1295 let max_def_level = self.descr.max_def_level();
1296 let max_rep_level = self.descr.max_rep_level();
1297
1298 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1299
1300 let page_statistics = match (values_data.min_value, values_data.max_value) {
1301 (Some(min), Some(max)) => {
1302 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1304 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1305
1306 (self.statistics_enabled == EnabledStatistics::Page).then_some(
1307 ValueStatistics::new(
1308 Some(min),
1309 Some(max),
1310 None,
1311 Some(self.page_metrics.num_page_nulls),
1312 false,
1313 ),
1314 )
1315 }
1316 _ => None,
1317 };
1318
1319 self.update_column_offset_index(
1321 page_statistics.as_ref(),
1322 values_data.variable_length_bytes,
1323 );
1324
1325 self.column_metrics
1327 .update_from_page_metrics(&self.page_metrics);
1328 self.column_metrics
1329 .update_variable_length_bytes(values_data.variable_length_bytes);
1330
1331 let page_statistics = page_statistics
1333 .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1334 .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1335
1336 let compressed_page = match self.props.writer_version() {
1337 WriterVersion::PARQUET_1_0 => {
1338 let mut buffer = vec![];
1339
1340 if max_rep_level > 0 {
1341 self.rep_levels_encoder
1342 .flush_to(|data| buffer.extend_from_slice(data));
1343 }
1344
1345 if max_def_level > 0 {
1346 self.def_levels_encoder
1347 .flush_to(|data| buffer.extend_from_slice(data));
1348 }
1349
1350 buffer.extend_from_slice(&values_data.buf);
1351 let uncompressed_size = buffer.len();
1352
1353 if let Some(ref mut cmpr) = self.compressor {
1354 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1355 cmpr.compress(&buffer[..], &mut compressed_buf)?;
1356 compressed_buf.shrink_to_fit();
1357 buffer = compressed_buf;
1358 }
1359
1360 let data_page = Page::DataPage {
1361 buf: buffer.into(),
1362 num_values: self.page_metrics.num_buffered_values,
1363 encoding: values_data.encoding,
1364 def_level_encoding: Encoding::RLE,
1365 rep_level_encoding: Encoding::RLE,
1366 statistics: page_statistics,
1367 };
1368
1369 CompressedPage::new(data_page, uncompressed_size)
1370 }
1371 WriterVersion::PARQUET_2_0 => {
1372 let mut rep_levels_byte_len = 0;
1373 let mut def_levels_byte_len = 0;
1374 let mut buffer = vec![];
1375
1376 if max_rep_level > 0 {
1377 self.rep_levels_encoder
1378 .flush_to(|data| buffer.extend_from_slice(data));
1379 rep_levels_byte_len = buffer.len();
1380 }
1381
1382 if max_def_level > 0 {
1383 self.def_levels_encoder
1384 .flush_to(|data| buffer.extend_from_slice(data));
1385 def_levels_byte_len = buffer.len() - rep_levels_byte_len;
1386 }
1387
1388 let uncompressed_size =
1389 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1390
1391 let is_compressed = match self.compressor {
1393 Some(ref mut cmpr) => {
1394 let buffer_len = buffer.len();
1395 cmpr.compress(&values_data.buf, &mut buffer)?;
1396 let compressed_values_size = buffer.len() - buffer_len;
1397 let threshold = self
1398 .props
1399 .column_data_page_v2_compression_ratio_threshold(self.descr.path());
1400 if (compressed_values_size as f64) >= (uncompressed_size as f64) * threshold
1401 {
1402 buffer.truncate(buffer_len);
1403 buffer.extend_from_slice(&values_data.buf);
1404 false
1405 } else {
1406 true
1407 }
1408 }
1409 None => {
1410 buffer.extend_from_slice(&values_data.buf);
1411 false
1412 }
1413 };
1414
1415 let data_page = Page::DataPageV2 {
1416 buf: buffer.into(),
1417 num_values: self.page_metrics.num_buffered_values,
1418 encoding: values_data.encoding,
1419 num_nulls: self.page_metrics.num_page_nulls as u32,
1420 num_rows: self.page_metrics.num_buffered_rows,
1421 def_levels_byte_len: def_levels_byte_len as u32,
1422 rep_levels_byte_len: rep_levels_byte_len as u32,
1423 is_compressed,
1424 statistics: page_statistics,
1425 };
1426
1427 CompressedPage::new(data_page, uncompressed_size)
1428 }
1429 };
1430
1431 if self.encoder.has_dictionary() && !self.page_writer.defers_dictionary_ordering() {
1440 self.data_pages.push_back(compressed_page);
1441 } else {
1442 self.write_data_page(compressed_page)?;
1443 }
1444
1445 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1447 self.page_metrics.new_page();
1448
1449 Ok(())
1450 }
1451
1452 #[inline]
1455 fn flush_data_pages(&mut self) -> Result<()> {
1456 if self.page_metrics.num_buffered_values > 0 {
1458 self.add_data_page()?;
1459 }
1460
1461 while let Some(page) = self.data_pages.pop_front() {
1462 self.write_data_page(page)?;
1463 }
1464
1465 Ok(())
1466 }
1467
1468 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1470 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1471 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1472 let num_values = self.column_metrics.total_num_values as i64;
1473 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1474 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1476
1477 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1478 .set_compression(self.codec)
1479 .set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
1480 .set_page_encoding_stats(self.encoding_stats.clone())
1481 .set_total_compressed_size(total_compressed_size)
1482 .set_total_uncompressed_size(total_uncompressed_size)
1483 .set_num_values(num_values)
1484 .set_data_page_offset(data_page_offset)
1485 .set_dictionary_page_offset(dict_page_offset);
1486
1487 if self.statistics_enabled != EnabledStatistics::None {
1488 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1489
1490 let statistics = ValueStatistics::<E::T>::new(
1491 self.column_metrics.min_column_value.clone(),
1492 self.column_metrics.max_column_value.clone(),
1493 self.column_metrics.column_distinct_count,
1494 Some(self.column_metrics.num_column_nulls),
1495 false,
1496 )
1497 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1498 .into();
1499
1500 let statistics = self.truncate_statistics(statistics);
1501
1502 builder = builder
1503 .set_statistics(statistics)
1504 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1505 .set_repetition_level_histogram(
1506 self.column_metrics.repetition_level_histogram.take(),
1507 )
1508 .set_definition_level_histogram(
1509 self.column_metrics.definition_level_histogram.take(),
1510 );
1511
1512 if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
1513 builder = builder.set_geo_statistics(geo_stats);
1514 }
1515 }
1516
1517 builder = self.set_column_chunk_encryption_properties(builder);
1518
1519 let metadata = builder.build()?;
1520 Ok(metadata)
1521 }
1522
1523 #[inline]
1525 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1526 self.encodings.insert(page.encoding());
1527 match self.encoding_stats.last_mut() {
1528 Some(encoding_stats)
1529 if encoding_stats.page_type == page.page_type()
1530 && encoding_stats.encoding == page.encoding() =>
1531 {
1532 encoding_stats.count += 1;
1533 }
1534 _ => {
1535 self.encoding_stats.push(PageEncodingStats {
1538 page_type: page.page_type(),
1539 encoding: page.encoding(),
1540 count: 1,
1541 });
1542 }
1543 }
1544 let page_spec = self.page_writer.write_page(page)?;
1545 if let Some(builder) = self.offset_index_builder.as_mut() {
1548 builder
1549 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1550 }
1551 self.update_metrics_for_page(page_spec);
1552 Ok(())
1553 }
1554
1555 #[inline]
1557 fn write_dictionary_page(&mut self) -> Result<()> {
1558 let compressed_page = {
1559 let mut page = self
1560 .encoder
1561 .flush_dict_page()?
1562 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1563
1564 let uncompressed_size = page.buf.len();
1565
1566 if let Some(ref mut cmpr) = self.compressor {
1567 let mut output_buf = Vec::with_capacity(uncompressed_size);
1568 cmpr.compress(&page.buf, &mut output_buf)?;
1569 page.buf = Bytes::from(output_buf);
1570 }
1571
1572 let dict_page = Page::DictionaryPage {
1573 buf: page.buf,
1574 num_values: page.num_values as u32,
1575 encoding: self.props.dictionary_page_encoding(),
1576 is_sorted: page.is_sorted,
1577 };
1578 CompressedPage::new(dict_page, uncompressed_size)
1579 };
1580
1581 self.encodings.insert(compressed_page.encoding());
1582 self.encoding_stats.push(PageEncodingStats {
1583 page_type: PageType::DICTIONARY_PAGE,
1584 encoding: compressed_page.encoding(),
1585 count: 1,
1586 });
1587 let page_spec = self.page_writer.write_page(compressed_page)?;
1588 self.update_metrics_for_page(page_spec);
1589 Ok(())
1591 }
1592
1593 #[inline]
1595 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1596 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1597 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1598 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1599
1600 match page_spec.page_type {
1601 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1602 self.column_metrics.total_num_values += page_spec.num_values as u64;
1603 if self.column_metrics.data_page_offset.is_none() {
1604 self.column_metrics.data_page_offset = Some(page_spec.offset);
1605 }
1606 }
1607 PageType::DICTIONARY_PAGE => {
1608 assert!(
1609 self.column_metrics.dictionary_page_offset.is_none(),
1610 "Dictionary offset is already set"
1611 );
1612 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1613 }
1614 _ => {}
1615 }
1616 }
1617
1618 #[inline]
1619 #[cfg(feature = "encryption")]
1620 fn set_column_chunk_encryption_properties(
1621 &self,
1622 builder: ColumnChunkMetaDataBuilder,
1623 ) -> ColumnChunkMetaDataBuilder {
1624 if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1625 builder.set_column_crypto_metadata(get_column_crypto_metadata(
1626 encryption_properties,
1627 &self.descr,
1628 ))
1629 } else {
1630 builder
1631 }
1632 }
1633
1634 #[inline]
1635 #[cfg(not(feature = "encryption"))]
1636 fn set_column_chunk_encryption_properties(
1637 &self,
1638 builder: ColumnChunkMetaDataBuilder,
1639 ) -> ColumnChunkMetaDataBuilder {
1640 builder
1641 }
1642}
1643
1644fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1645 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1646}
1647
1648fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1649 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1650}
1651
1652#[inline]
1653#[allow(clippy::eq_op)]
1654fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1655 match T::PHYSICAL_TYPE {
1656 Type::FLOAT | Type::DOUBLE => val != val,
1657 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type_ref() == Some(&LogicalType::Float16) => {
1658 let val = val.as_bytes();
1659 let val = f16::from_le_bytes([val[0], val[1]]);
1660 val.is_nan()
1661 }
1662 _ => false,
1663 }
1664}
1665
1666fn update_stat<T: ParquetValueType, F>(
1671 descr: &ColumnDescriptor,
1672 val: &T,
1673 cur: &mut Option<T>,
1674 should_update: F,
1675) where
1676 F: Fn(&T) -> bool,
1677{
1678 if is_nan(descr, val) {
1679 return;
1680 }
1681
1682 if cur.as_ref().is_none_or(should_update) {
1683 *cur = Some(val.clone());
1684 }
1685}
1686
1687fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1689 match T::PHYSICAL_TYPE {
1690 Type::INT32 | Type::INT64 => {
1691 if let Some(LogicalType::Integer(IntType {
1692 is_signed: false, ..
1693 })) = descr.logical_type_ref()
1694 {
1695 return compare_greater_unsigned_int(a, b);
1697 }
1698
1699 match descr.converted_type() {
1700 ConvertedType::UINT_8
1701 | ConvertedType::UINT_16
1702 | ConvertedType::UINT_32
1703 | ConvertedType::UINT_64 => {
1704 return compare_greater_unsigned_int(a, b);
1705 }
1706 _ => {}
1707 };
1708 }
1709 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1710 if let Some(LogicalType::Decimal(_)) = descr.logical_type_ref() {
1711 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1712 }
1713 if let ConvertedType::DECIMAL = descr.converted_type() {
1714 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1715 }
1716 if let Some(LogicalType::Float16) = descr.logical_type_ref() {
1717 return compare_greater_f16(a.as_bytes(), b.as_bytes());
1718 }
1719 }
1720
1721 _ => {}
1722 }
1723
1724 a > b
1726}
1727
1728fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1736 match (kind, props.writer_version()) {
1737 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1738 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1739 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1740 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1741 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1742 _ => Encoding::PLAIN,
1743 }
1744}
1745
1746fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1748 match (kind, props.writer_version()) {
1749 (Type::BOOLEAN, _) => false,
1751 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1753 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1754 _ => true,
1755 }
1756}
1757
1758#[inline]
1759fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1760 a.as_u64().unwrap() > b.as_u64().unwrap()
1761}
1762
1763#[inline]
1764fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1765 let a = f16::from_le_bytes(a.try_into().unwrap());
1766 let b = f16::from_le_bytes(b.try_into().unwrap());
1767 a > b
1768}
1769
1770fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1772 let a_length = a.len();
1773 let b_length = b.len();
1774
1775 if a_length == 0 || b_length == 0 {
1776 return a_length > 0;
1777 }
1778
1779 let first_a: u8 = a[0];
1780 let first_b: u8 = b[0];
1781
1782 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1787 return (first_a as i8) > (first_b as i8);
1788 }
1789
1790 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1796
1797 if a_length != b_length {
1798 let not_equal = if a_length > b_length {
1799 let lead_length = a_length - b_length;
1800 a[0..lead_length].iter().any(|&x| x != extension)
1801 } else {
1802 let lead_length = b_length - a_length;
1803 b[0..lead_length].iter().any(|&x| x != extension)
1804 };
1805
1806 if not_equal {
1807 let negative_values: bool = (first_a as i8) < 0;
1808 let a_longer: bool = a_length > b_length;
1809 return if negative_values { !a_longer } else { a_longer };
1810 }
1811 }
1812
1813 (a[1..]) > (b[1..])
1814}
1815
1816fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1822 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1823 Some(data.as_bytes()[..split].to_vec())
1824}
1825
1826fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1832 let lower_bound = length.saturating_sub(3);
1834 let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1835 increment_utf8(data.get(..split)?)
1836}
1837
1838fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1845 for (idx, original_char) in data.char_indices().rev() {
1846 let original_len = original_char.len_utf8();
1847 if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1848 if next_char.len_utf8() == original_len {
1850 let mut result = data.as_bytes()[..idx + original_len].to_vec();
1851 next_char.encode_utf8(&mut result[idx..]);
1852 return Some(result);
1853 }
1854 }
1855 }
1856
1857 None
1858}
1859
1860fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1864 for byte in data.iter_mut().rev() {
1865 let (incremented, overflow) = byte.overflowing_add(1);
1866 *byte = incremented;
1867
1868 if !overflow {
1869 return Some(data);
1870 }
1871 }
1872
1873 None
1874}
1875
1876#[cfg(test)]
1877mod tests {
1878 use crate::{
1879 file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1880 schema::parser::parse_message_type,
1881 };
1882 use core::str;
1883 use rand::distr::uniform::SampleUniform;
1884 use std::{fs::File, sync::Arc};
1885
1886 use crate::column::{
1887 page::PageReader,
1888 reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader},
1889 };
1890 use crate::file::writer::TrackedWrite;
1891 use crate::file::{
1892 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1893 };
1894 use crate::schema::types::{ColumnPath, Type as SchemaType};
1895 use crate::util::test_common::rand_gen::random_numbers_range;
1896
1897 use super::*;
1898
1899 #[test]
1900 fn test_column_writer_inconsistent_def_rep_length() {
1901 let page_writer = get_test_page_writer();
1902 let props = Default::default();
1903 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1904 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1905 assert!(res.is_err());
1906 if let Err(err) = res {
1907 assert_eq!(
1908 format!("{err}"),
1909 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1910 );
1911 }
1912 }
1913
1914 #[test]
1915 fn test_column_writer_invalid_def_levels() {
1916 let page_writer = get_test_page_writer();
1917 let props = Default::default();
1918 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1919 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1920 assert!(res.is_err());
1921 if let Err(err) = res {
1922 assert_eq!(
1923 format!("{err}"),
1924 "Parquet error: Definition levels are required, because max definition level = 1"
1925 );
1926 }
1927 }
1928
1929 #[test]
1930 fn test_column_writer_invalid_rep_levels() {
1931 let page_writer = get_test_page_writer();
1932 let props = Default::default();
1933 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1934 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1935 assert!(res.is_err());
1936 if let Err(err) = res {
1937 assert_eq!(
1938 format!("{err}"),
1939 "Parquet error: Repetition levels are required, because max repetition level = 1"
1940 );
1941 }
1942 }
1943
1944 #[test]
1945 fn test_column_writer_not_enough_values_to_write() {
1946 let page_writer = get_test_page_writer();
1947 let props = Default::default();
1948 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1949 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1950 assert!(res.is_err());
1951 if let Err(err) = res {
1952 assert_eq!(
1953 format!("{err}"),
1954 "Parquet error: Expected to write 4 values, but have only 2"
1955 );
1956 }
1957 }
1958
1959 #[test]
1960 fn test_column_writer_write_only_one_dictionary_page() {
1961 let page_writer = get_test_page_writer();
1962 let props = Default::default();
1963 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1964 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1965 writer.add_data_page().unwrap();
1967 writer.write_dictionary_page().unwrap();
1968 let err = writer.write_dictionary_page().unwrap_err().to_string();
1969 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1970 }
1971
1972 #[test]
1973 fn test_column_writer_error_when_writing_disabled_dictionary() {
1974 let page_writer = get_test_page_writer();
1975 let props = Arc::new(
1976 WriterProperties::builder()
1977 .set_dictionary_enabled(false)
1978 .build(),
1979 );
1980 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1981 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1982 let err = writer.write_dictionary_page().unwrap_err().to_string();
1983 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1984 }
1985
1986 #[test]
1987 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1988 let page_writer = get_test_page_writer();
1989 let props = Arc::new(
1990 WriterProperties::builder()
1991 .set_dictionary_enabled(true)
1992 .build(),
1993 );
1994 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1995 writer
1996 .write_batch(&[true, false, true, false], None, None)
1997 .unwrap();
1998
1999 let r = writer.close().unwrap();
2000 assert_eq!(r.bytes_written, 1);
2003 assert_eq!(r.rows_written, 4);
2004
2005 let metadata = r.metadata;
2006 assert_eq!(
2007 metadata.encodings().collect::<Vec<_>>(),
2008 vec![Encoding::PLAIN, Encoding::RLE]
2009 );
2010 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
2012 }
2013
2014 #[test]
2015 fn test_column_writer_default_encoding_support_bool() {
2016 check_encoding_write_support::<BoolType>(
2017 WriterVersion::PARQUET_1_0,
2018 true,
2019 &[true, false],
2020 None,
2021 &[Encoding::PLAIN, Encoding::RLE],
2022 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2023 );
2024 check_encoding_write_support::<BoolType>(
2025 WriterVersion::PARQUET_1_0,
2026 false,
2027 &[true, false],
2028 None,
2029 &[Encoding::PLAIN, Encoding::RLE],
2030 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2031 );
2032 check_encoding_write_support::<BoolType>(
2033 WriterVersion::PARQUET_2_0,
2034 true,
2035 &[true, false],
2036 None,
2037 &[Encoding::RLE],
2038 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
2039 );
2040 check_encoding_write_support::<BoolType>(
2041 WriterVersion::PARQUET_2_0,
2042 false,
2043 &[true, false],
2044 None,
2045 &[Encoding::RLE],
2046 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
2047 );
2048 }
2049
2050 #[test]
2051 fn test_column_writer_default_encoding_support_int32() {
2052 check_encoding_write_support::<Int32Type>(
2053 WriterVersion::PARQUET_1_0,
2054 true,
2055 &[1, 2],
2056 Some(0),
2057 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2058 &[
2059 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2060 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2061 ],
2062 );
2063 check_encoding_write_support::<Int32Type>(
2064 WriterVersion::PARQUET_1_0,
2065 false,
2066 &[1, 2],
2067 None,
2068 &[Encoding::PLAIN, Encoding::RLE],
2069 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2070 );
2071 check_encoding_write_support::<Int32Type>(
2072 WriterVersion::PARQUET_2_0,
2073 true,
2074 &[1, 2],
2075 Some(0),
2076 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2077 &[
2078 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2079 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2080 ],
2081 );
2082 check_encoding_write_support::<Int32Type>(
2083 WriterVersion::PARQUET_2_0,
2084 false,
2085 &[1, 2],
2086 None,
2087 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
2088 &[encoding_stats(
2089 PageType::DATA_PAGE_V2,
2090 Encoding::DELTA_BINARY_PACKED,
2091 1,
2092 )],
2093 );
2094 }
2095
2096 #[test]
2097 fn test_column_writer_default_encoding_support_int64() {
2098 check_encoding_write_support::<Int64Type>(
2099 WriterVersion::PARQUET_1_0,
2100 true,
2101 &[1, 2],
2102 Some(0),
2103 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2104 &[
2105 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2106 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2107 ],
2108 );
2109 check_encoding_write_support::<Int64Type>(
2110 WriterVersion::PARQUET_1_0,
2111 false,
2112 &[1, 2],
2113 None,
2114 &[Encoding::PLAIN, Encoding::RLE],
2115 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2116 );
2117 check_encoding_write_support::<Int64Type>(
2118 WriterVersion::PARQUET_2_0,
2119 true,
2120 &[1, 2],
2121 Some(0),
2122 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2123 &[
2124 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2125 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2126 ],
2127 );
2128 check_encoding_write_support::<Int64Type>(
2129 WriterVersion::PARQUET_2_0,
2130 false,
2131 &[1, 2],
2132 None,
2133 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
2134 &[encoding_stats(
2135 PageType::DATA_PAGE_V2,
2136 Encoding::DELTA_BINARY_PACKED,
2137 1,
2138 )],
2139 );
2140 }
2141
2142 #[test]
2143 fn test_column_writer_default_encoding_support_int96() {
2144 check_encoding_write_support::<Int96Type>(
2145 WriterVersion::PARQUET_1_0,
2146 true,
2147 &[Int96::from(vec![1, 2, 3])],
2148 Some(0),
2149 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2150 &[
2151 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2152 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2153 ],
2154 );
2155 check_encoding_write_support::<Int96Type>(
2156 WriterVersion::PARQUET_1_0,
2157 false,
2158 &[Int96::from(vec![1, 2, 3])],
2159 None,
2160 &[Encoding::PLAIN, Encoding::RLE],
2161 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2162 );
2163 check_encoding_write_support::<Int96Type>(
2164 WriterVersion::PARQUET_2_0,
2165 true,
2166 &[Int96::from(vec![1, 2, 3])],
2167 Some(0),
2168 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2169 &[
2170 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2171 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2172 ],
2173 );
2174 check_encoding_write_support::<Int96Type>(
2175 WriterVersion::PARQUET_2_0,
2176 false,
2177 &[Int96::from(vec![1, 2, 3])],
2178 None,
2179 &[Encoding::PLAIN, Encoding::RLE],
2180 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
2181 );
2182 }
2183
2184 #[test]
2185 fn test_column_writer_default_encoding_support_float() {
2186 check_encoding_write_support::<FloatType>(
2187 WriterVersion::PARQUET_1_0,
2188 true,
2189 &[1.0, 2.0],
2190 Some(0),
2191 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2192 &[
2193 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2194 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2195 ],
2196 );
2197 check_encoding_write_support::<FloatType>(
2198 WriterVersion::PARQUET_1_0,
2199 false,
2200 &[1.0, 2.0],
2201 None,
2202 &[Encoding::PLAIN, Encoding::RLE],
2203 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2204 );
2205 check_encoding_write_support::<FloatType>(
2206 WriterVersion::PARQUET_2_0,
2207 true,
2208 &[1.0, 2.0],
2209 Some(0),
2210 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2211 &[
2212 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2213 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2214 ],
2215 );
2216 check_encoding_write_support::<FloatType>(
2217 WriterVersion::PARQUET_2_0,
2218 false,
2219 &[1.0, 2.0],
2220 None,
2221 &[Encoding::PLAIN, Encoding::RLE],
2222 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
2223 );
2224 }
2225
2226 #[test]
2227 fn test_column_writer_default_encoding_support_double() {
2228 check_encoding_write_support::<DoubleType>(
2229 WriterVersion::PARQUET_1_0,
2230 true,
2231 &[1.0, 2.0],
2232 Some(0),
2233 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2234 &[
2235 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2236 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2237 ],
2238 );
2239 check_encoding_write_support::<DoubleType>(
2240 WriterVersion::PARQUET_1_0,
2241 false,
2242 &[1.0, 2.0],
2243 None,
2244 &[Encoding::PLAIN, Encoding::RLE],
2245 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2246 );
2247 check_encoding_write_support::<DoubleType>(
2248 WriterVersion::PARQUET_2_0,
2249 true,
2250 &[1.0, 2.0],
2251 Some(0),
2252 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2253 &[
2254 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2255 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2256 ],
2257 );
2258 check_encoding_write_support::<DoubleType>(
2259 WriterVersion::PARQUET_2_0,
2260 false,
2261 &[1.0, 2.0],
2262 None,
2263 &[Encoding::PLAIN, Encoding::RLE],
2264 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
2265 );
2266 }
2267
2268 #[test]
2269 fn test_column_writer_default_encoding_support_byte_array() {
2270 check_encoding_write_support::<ByteArrayType>(
2271 WriterVersion::PARQUET_1_0,
2272 true,
2273 &[ByteArray::from(vec![1u8])],
2274 Some(0),
2275 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2276 &[
2277 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2278 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2279 ],
2280 );
2281 check_encoding_write_support::<ByteArrayType>(
2282 WriterVersion::PARQUET_1_0,
2283 false,
2284 &[ByteArray::from(vec![1u8])],
2285 None,
2286 &[Encoding::PLAIN, Encoding::RLE],
2287 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2288 );
2289 check_encoding_write_support::<ByteArrayType>(
2290 WriterVersion::PARQUET_2_0,
2291 true,
2292 &[ByteArray::from(vec![1u8])],
2293 Some(0),
2294 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2295 &[
2296 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2297 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2298 ],
2299 );
2300 check_encoding_write_support::<ByteArrayType>(
2301 WriterVersion::PARQUET_2_0,
2302 false,
2303 &[ByteArray::from(vec![1u8])],
2304 None,
2305 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2306 &[encoding_stats(
2307 PageType::DATA_PAGE_V2,
2308 Encoding::DELTA_BYTE_ARRAY,
2309 1,
2310 )],
2311 );
2312 }
2313
2314 #[test]
2315 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2316 check_encoding_write_support::<FixedLenByteArrayType>(
2317 WriterVersion::PARQUET_1_0,
2318 true,
2319 &[ByteArray::from(vec![1u8]).into()],
2320 None,
2321 &[Encoding::PLAIN, Encoding::RLE],
2322 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2323 );
2324 check_encoding_write_support::<FixedLenByteArrayType>(
2325 WriterVersion::PARQUET_1_0,
2326 false,
2327 &[ByteArray::from(vec![1u8]).into()],
2328 None,
2329 &[Encoding::PLAIN, Encoding::RLE],
2330 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2331 );
2332 check_encoding_write_support::<FixedLenByteArrayType>(
2333 WriterVersion::PARQUET_2_0,
2334 true,
2335 &[ByteArray::from(vec![1u8]).into()],
2336 Some(0),
2337 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2338 &[
2339 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2340 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2341 ],
2342 );
2343 check_encoding_write_support::<FixedLenByteArrayType>(
2344 WriterVersion::PARQUET_2_0,
2345 false,
2346 &[ByteArray::from(vec![1u8]).into()],
2347 None,
2348 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2349 &[encoding_stats(
2350 PageType::DATA_PAGE_V2,
2351 Encoding::DELTA_BYTE_ARRAY,
2352 1,
2353 )],
2354 );
2355 }
2356
2357 #[test]
2358 fn test_column_writer_check_metadata() {
2359 let page_writer = get_test_page_writer();
2360 let props = Default::default();
2361 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2362 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2363
2364 let r = writer.close().unwrap();
2365 assert_eq!(r.bytes_written, 20);
2366 assert_eq!(r.rows_written, 4);
2367
2368 let metadata = r.metadata;
2369 assert_eq!(
2370 metadata.encodings().collect::<Vec<_>>(),
2371 vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2372 );
2373 assert_eq!(metadata.num_values(), 4);
2374 assert_eq!(metadata.compressed_size(), 20);
2375 assert_eq!(metadata.uncompressed_size(), 20);
2376 assert_eq!(metadata.data_page_offset(), 0);
2377 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2378 if let Some(stats) = metadata.statistics() {
2379 assert_eq!(stats.null_count_opt(), Some(0));
2380 assert_eq!(stats.distinct_count_opt(), None);
2381 if let Statistics::Int32(stats) = stats {
2382 assert_eq!(stats.min_opt().unwrap(), &1);
2383 assert_eq!(stats.max_opt().unwrap(), &4);
2384 } else {
2385 panic!("expecting Statistics::Int32");
2386 }
2387 } else {
2388 panic!("metadata missing statistics");
2389 }
2390 }
2391
2392 #[test]
2393 fn test_column_writer_check_byte_array_min_max() {
2394 let page_writer = get_test_page_writer();
2395 let props = Default::default();
2396 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2397 writer
2398 .write_batch(
2399 &[
2400 ByteArray::from(vec![
2401 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2402 35u8, 231u8, 90u8, 0u8, 0u8,
2403 ]),
2404 ByteArray::from(vec![
2405 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2406 152u8, 177u8, 56u8, 0u8, 0u8,
2407 ]),
2408 ByteArray::from(vec![
2409 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2410 0u8,
2411 ]),
2412 ByteArray::from(vec![
2413 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2414 44u8, 0u8, 0u8,
2415 ]),
2416 ],
2417 None,
2418 None,
2419 )
2420 .unwrap();
2421 let metadata = writer.close().unwrap().metadata;
2422 if let Some(stats) = metadata.statistics() {
2423 if let Statistics::ByteArray(stats) = stats {
2424 assert_eq!(
2425 stats.min_opt().unwrap(),
2426 &ByteArray::from(vec![
2427 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2428 35u8, 231u8, 90u8, 0u8, 0u8,
2429 ])
2430 );
2431 assert_eq!(
2432 stats.max_opt().unwrap(),
2433 &ByteArray::from(vec![
2434 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2435 44u8, 0u8, 0u8,
2436 ])
2437 );
2438 } else {
2439 panic!("expecting Statistics::ByteArray");
2440 }
2441 } else {
2442 panic!("metadata missing statistics");
2443 }
2444 }
2445
2446 #[test]
2447 fn test_column_writer_uint32_converted_type_min_max() {
2448 let page_writer = get_test_page_writer();
2449 let props = Default::default();
2450 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2451 page_writer,
2452 0,
2453 0,
2454 props,
2455 );
2456 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2457 let metadata = writer.close().unwrap().metadata;
2458 if let Some(stats) = metadata.statistics() {
2459 if let Statistics::Int32(stats) = stats {
2460 assert_eq!(stats.min_opt().unwrap(), &0,);
2461 assert_eq!(stats.max_opt().unwrap(), &5,);
2462 } else {
2463 panic!("expecting Statistics::Int32");
2464 }
2465 } else {
2466 panic!("metadata missing statistics");
2467 }
2468 }
2469
2470 #[test]
2471 fn test_column_writer_precalculated_statistics() {
2472 let page_writer = get_test_page_writer();
2473 let props = Arc::new(
2474 WriterProperties::builder()
2475 .set_statistics_enabled(EnabledStatistics::Chunk)
2476 .build(),
2477 );
2478 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2479 writer
2480 .write_batch_with_statistics(
2481 &[1, 2, 3, 4],
2482 None,
2483 None,
2484 Some(&-17),
2485 Some(&9000),
2486 Some(55),
2487 )
2488 .unwrap();
2489
2490 let r = writer.close().unwrap();
2491 assert_eq!(r.bytes_written, 20);
2492 assert_eq!(r.rows_written, 4);
2493
2494 let metadata = r.metadata;
2495 assert_eq!(
2496 metadata.encodings().collect::<Vec<_>>(),
2497 vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2498 );
2499 assert_eq!(metadata.num_values(), 4);
2500 assert_eq!(metadata.compressed_size(), 20);
2501 assert_eq!(metadata.uncompressed_size(), 20);
2502 assert_eq!(metadata.data_page_offset(), 0);
2503 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2504 if let Some(stats) = metadata.statistics() {
2505 assert_eq!(stats.null_count_opt(), Some(0));
2506 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2507 if let Statistics::Int32(stats) = stats {
2508 assert_eq!(stats.min_opt().unwrap(), &-17);
2509 assert_eq!(stats.max_opt().unwrap(), &9000);
2510 } else {
2511 panic!("expecting Statistics::Int32");
2512 }
2513 } else {
2514 panic!("metadata missing statistics");
2515 }
2516 }
2517
2518 #[test]
2519 fn test_mixed_precomputed_statistics() {
2520 let mut buf = Vec::with_capacity(100);
2521 let mut write = TrackedWrite::new(&mut buf);
2522 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2523 let props = Arc::new(
2524 WriterProperties::builder()
2525 .set_write_page_header_statistics(true)
2526 .build(),
2527 );
2528 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2529
2530 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2531 writer
2532 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2533 .unwrap();
2534
2535 let r = writer.close().unwrap();
2536
2537 let stats = r.metadata.statistics().unwrap();
2538 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2539 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2540 assert_eq!(stats.null_count_opt(), Some(0));
2541 assert!(stats.distinct_count_opt().is_none());
2542
2543 drop(write);
2544
2545 let props = ReaderProperties::builder()
2546 .set_backward_compatible_lz4(false)
2547 .set_read_page_statistics(true)
2548 .build();
2549 let reader = SerializedPageReader::new_with_properties(
2550 Arc::new(Bytes::from(buf)),
2551 &r.metadata,
2552 r.rows_written as usize,
2553 None,
2554 Arc::new(props),
2555 )
2556 .unwrap();
2557
2558 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2559 assert_eq!(pages.len(), 2);
2560
2561 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2562 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2563
2564 let page_statistics = pages[1].statistics().unwrap();
2565 assert_eq!(
2566 page_statistics.min_bytes_opt().unwrap(),
2567 1_i32.to_le_bytes()
2568 );
2569 assert_eq!(
2570 page_statistics.max_bytes_opt().unwrap(),
2571 7_i32.to_le_bytes()
2572 );
2573 assert_eq!(page_statistics.null_count_opt(), Some(0));
2574 assert!(page_statistics.distinct_count_opt().is_none());
2575 }
2576
2577 #[test]
2578 fn test_disabled_statistics() {
2579 let mut buf = Vec::with_capacity(100);
2580 let mut write = TrackedWrite::new(&mut buf);
2581 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2582 let props = WriterProperties::builder()
2583 .set_statistics_enabled(EnabledStatistics::None)
2584 .set_writer_version(WriterVersion::PARQUET_2_0)
2585 .build();
2586 let props = Arc::new(props);
2587
2588 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2589 writer
2590 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2591 .unwrap();
2592
2593 let r = writer.close().unwrap();
2594 assert!(r.metadata.statistics().is_none());
2595
2596 drop(write);
2597
2598 let props = ReaderProperties::builder()
2599 .set_backward_compatible_lz4(false)
2600 .build();
2601 let reader = SerializedPageReader::new_with_properties(
2602 Arc::new(Bytes::from(buf)),
2603 &r.metadata,
2604 r.rows_written as usize,
2605 None,
2606 Arc::new(props),
2607 )
2608 .unwrap();
2609
2610 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2611 assert_eq!(pages.len(), 2);
2612
2613 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2614 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2615
2616 match &pages[1] {
2617 Page::DataPageV2 {
2618 num_values,
2619 num_nulls,
2620 num_rows,
2621 statistics,
2622 ..
2623 } => {
2624 assert_eq!(*num_values, 6);
2625 assert_eq!(*num_nulls, 2);
2626 assert_eq!(*num_rows, 6);
2627 assert!(statistics.is_none());
2628 }
2629 _ => unreachable!(),
2630 }
2631 }
2632
2633 #[test]
2634 fn test_column_writer_empty_column_roundtrip() {
2635 let props = Default::default();
2636 column_roundtrip::<Int32Type>(props, &[], None, None);
2637 }
2638
2639 #[test]
2640 fn test_column_writer_non_nullable_values_roundtrip() {
2641 let props = Default::default();
2642 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2643 }
2644
2645 #[test]
2646 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2647 let props = Default::default();
2648 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2649 }
2650
2651 #[test]
2652 fn test_column_writer_nullable_repeated_values_roundtrip() {
2653 let props = Default::default();
2654 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2655 }
2656
2657 #[test]
2658 fn test_column_writer_dictionary_fallback_small_data_page() {
2659 let props = WriterProperties::builder()
2660 .set_dictionary_page_size_limit(32)
2661 .set_data_page_size_limit(32)
2662 .build();
2663 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2664 }
2665
2666 #[test]
2667 fn test_column_writer_small_write_batch_size() {
2668 for i in &[1usize, 2, 5, 10, 11, 1023] {
2669 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2670
2671 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2672 }
2673 }
2674
2675 #[test]
2676 fn test_column_writer_dictionary_disabled_v1() {
2677 let props = WriterProperties::builder()
2678 .set_writer_version(WriterVersion::PARQUET_1_0)
2679 .set_dictionary_enabled(false)
2680 .build();
2681 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2682 }
2683
2684 #[test]
2685 fn test_column_writer_dictionary_disabled_v2() {
2686 let props = WriterProperties::builder()
2687 .set_writer_version(WriterVersion::PARQUET_2_0)
2688 .set_dictionary_enabled(false)
2689 .build();
2690 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2691 }
2692
2693 #[test]
2694 fn test_column_writer_compression_v1() {
2695 let props = WriterProperties::builder()
2696 .set_writer_version(WriterVersion::PARQUET_1_0)
2697 .set_compression(Compression::SNAPPY)
2698 .build();
2699 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2700 }
2701
2702 #[test]
2703 fn test_column_writer_compression_v2() {
2704 let props = WriterProperties::builder()
2705 .set_writer_version(WriterVersion::PARQUET_2_0)
2706 .set_compression(Compression::SNAPPY)
2707 .build();
2708 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2709 }
2710
2711 #[test]
2712 fn test_column_writer_v2_compression_ratio_threshold() {
2713 fn write_v2_page(threshold: f64) -> bool {
2714 let mut buf = Vec::with_capacity(4096);
2715 let mut write = TrackedWrite::new(&mut buf);
2716 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2717 let props = Arc::new(
2718 WriterProperties::builder()
2719 .set_writer_version(WriterVersion::PARQUET_2_0)
2720 .set_compression(Compression::SNAPPY)
2721 .set_dictionary_enabled(false)
2722 .set_data_page_v2_compression_ratio_threshold(threshold)
2723 .build(),
2724 );
2725
2726 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2727 let values: Vec<i32> = vec![42; 4096];
2728 writer.write_batch(&values, None, None).unwrap();
2729 let r = writer.close().unwrap();
2730 drop(write);
2731
2732 let reader_props = ReaderProperties::builder()
2733 .set_backward_compatible_lz4(false)
2734 .build();
2735 let reader = SerializedPageReader::new_with_properties(
2736 Arc::new(Bytes::from(buf)),
2737 &r.metadata,
2738 r.rows_written as usize,
2739 None,
2740 Arc::new(reader_props),
2741 )
2742 .unwrap();
2743 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2744 let data_page = pages
2745 .iter()
2746 .find(|p| p.page_type() == PageType::DATA_PAGE_V2)
2747 .expect("expected a v2 data page");
2748 match data_page {
2749 Page::DataPageV2 { is_compressed, .. } => *is_compressed,
2750 _ => unreachable!(),
2751 }
2752 }
2753
2754 assert!(write_v2_page(1.0));
2756 assert!(!write_v2_page(0.001));
2758 }
2759
2760 #[test]
2761 fn test_column_writer_add_data_pages_with_dict() {
2762 let mut file = tempfile::tempfile().unwrap();
2765 let mut write = TrackedWrite::new(&mut file);
2766 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2767 let props = Arc::new(
2768 WriterProperties::builder()
2769 .set_data_page_size_limit(10)
2770 .set_write_batch_size(3) .build(),
2772 );
2773 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2774 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2775 writer.write_batch(data, None, None).unwrap();
2776 let r = writer.close().unwrap();
2777
2778 drop(write);
2779
2780 let props = ReaderProperties::builder()
2782 .set_backward_compatible_lz4(false)
2783 .build();
2784 let mut page_reader = Box::new(
2785 SerializedPageReader::new_with_properties(
2786 Arc::new(file),
2787 &r.metadata,
2788 r.rows_written as usize,
2789 None,
2790 Arc::new(props),
2791 )
2792 .unwrap(),
2793 );
2794 let mut res = Vec::new();
2795 while let Some(page) = page_reader.get_next_page().unwrap() {
2796 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2797 }
2798 assert_eq!(
2799 res,
2800 vec![
2801 (PageType::DICTIONARY_PAGE, 10, 40),
2802 (PageType::DATA_PAGE, 9, 10),
2803 (PageType::DATA_PAGE, 1, 3),
2804 ]
2805 );
2806 assert_eq!(
2807 r.metadata.page_encoding_stats(),
2808 Some(&vec![
2809 PageEncodingStats {
2810 page_type: PageType::DICTIONARY_PAGE,
2811 encoding: Encoding::PLAIN,
2812 count: 1
2813 },
2814 PageEncodingStats {
2815 page_type: PageType::DATA_PAGE,
2816 encoding: Encoding::RLE_DICTIONARY,
2817 count: 2,
2818 }
2819 ])
2820 );
2821 }
2822
2823 #[test]
2824 fn test_column_writer_column_data_page_size_limit() {
2825 let props = Arc::new(
2826 WriterProperties::builder()
2827 .set_writer_version(WriterVersion::PARQUET_1_0)
2828 .set_dictionary_enabled(false)
2829 .set_data_page_size_limit(1000)
2830 .set_column_data_page_size_limit(ColumnPath::from("col"), 10)
2831 .set_write_batch_size(3)
2832 .build(),
2833 );
2834 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2835
2836 let col_values =
2837 write_and_collect_page_values(ColumnPath::from("col"), Arc::clone(&props), data);
2838 let other_values = write_and_collect_page_values(ColumnPath::from("other"), props, data);
2839
2840 assert_eq!(col_values, vec![3, 3, 3, 1]);
2841 assert_eq!(other_values, vec![10]);
2842 }
2843
2844 #[test]
2845 fn test_column_writer_caps_page_size_for_large_byte_array_values() {
2846 let value_size = 64 * 1024; let page_byte_limit = 16 * 1024; let num_rows = 64;
2854
2855 let props = WriterProperties::builder()
2856 .set_writer_version(WriterVersion::PARQUET_1_0)
2857 .set_dictionary_enabled(false)
2858 .set_encoding(Encoding::PLAIN)
2859 .set_data_page_size_limit(page_byte_limit)
2860 .build();
2863
2864 let data: Vec<_> = (0..num_rows)
2865 .map(|i| ByteArray::from(vec![i as u8; value_size]))
2866 .collect();
2867 let pages = write_and_collect_pages::<ByteArrayType>(props, 0, 0, &data, None, None);
2868
2869 let total_values: u32 = pages.data_pages.iter().map(|(_, n)| n).sum();
2871 assert_eq!(total_values as usize, num_rows);
2872 assert!(
2876 pages.data_pages.len() >= num_rows / 2,
2877 "expected pages to be cut close to one per value, got {:?}",
2878 pages.data_pages,
2879 );
2880 for (size, _) in &pages.data_pages {
2884 assert!(
2885 *size <= value_size + 64,
2886 "page size {size} exceeds one-value bound ({}B) — pages {:?}",
2887 value_size + 64,
2888 pages.data_pages,
2889 );
2890 }
2891 }
2892
2893 #[test]
2894 fn test_column_writer_caps_page_size_for_large_values_in_list() {
2895 let value_size = 32 * 1024;
2906 let page_byte_limit = 16 * 1024;
2907 let values_per_record = 3;
2908 let num_records = 3;
2909 let num_values = values_per_record * num_records;
2910
2911 let mut rep_levels = Vec::with_capacity(num_values);
2913 for _ in 0..num_records {
2914 rep_levels.push(0i16);
2915 rep_levels.extend(std::iter::repeat_n(1i16, values_per_record - 1));
2916 }
2917 let def_levels = vec![1i16; num_values];
2918
2919 let props = WriterProperties::builder()
2920 .set_writer_version(WriterVersion::PARQUET_1_0)
2921 .set_dictionary_enabled(false)
2922 .set_encoding(Encoding::PLAIN)
2923 .set_data_page_size_limit(page_byte_limit)
2924 .build();
2925
2926 let data: Vec<_> = (0..num_values)
2927 .map(|i| ByteArray::from(vec![i as u8; value_size]))
2928 .collect();
2929 let pages = write_and_collect_pages::<ByteArrayType>(
2930 props,
2931 1,
2932 1,
2933 &data,
2934 Some(&def_levels),
2935 Some(&rep_levels),
2936 );
2937 let data_pages = pages.data_pages;
2938
2939 assert_eq!(
2943 data_pages.len(),
2944 num_records,
2945 "expected one data page per record, got {data_pages:?}"
2946 );
2947 for (bytes, n_values) in &data_pages {
2948 assert_eq!(
2949 *n_values as usize, values_per_record,
2950 "each page must hold a whole record's leaves, got {data_pages:?}"
2951 );
2952 let upper_bound = values_per_record * (value_size + 16);
2956 assert!(
2957 *bytes <= upper_bound,
2958 "page size {bytes} exceeds whole-record bound ({upper_bound}); pages {data_pages:?}"
2959 );
2960 }
2961 }
2962
2963 #[test]
2964 fn test_column_writer_caps_page_size_with_nullable_large_values() {
2965 let value_size = 32 * 1024;
2971 let page_byte_limit = 16 * 1024;
2972 let num_levels = 32;
2973
2974 let def_levels: Vec<i16> = (0..num_levels as i16).map(|i| i % 2).collect();
2976 let num_values = def_levels.iter().filter(|&&d| d == 1).count();
2977
2978 let props = WriterProperties::builder()
2979 .set_writer_version(WriterVersion::PARQUET_1_0)
2980 .set_dictionary_enabled(false)
2981 .set_encoding(Encoding::PLAIN)
2982 .set_data_page_size_limit(page_byte_limit)
2983 .build();
2984
2985 let data: Vec<_> = (0..num_values)
2986 .map(|i| ByteArray::from(vec![i as u8; value_size]))
2987 .collect();
2988 let pages =
2989 write_and_collect_pages::<ByteArrayType>(props, 1, 0, &data, Some(&def_levels), None);
2990 let data_pages: Vec<_> = pages.data_pages.iter().map(|(size, _)| *size).collect();
2991
2992 assert!(
2997 data_pages.len() >= num_values / 2,
2998 "expected at least {} pages for {num_values} large values, got {} pages: {data_pages:?}",
2999 num_values / 2,
3000 data_pages.len(),
3001 );
3002 for size in &data_pages {
3004 assert!(
3005 *size <= value_size + 64,
3006 "page size {size} exceeds one-value bound; pages {data_pages:?}"
3007 );
3008 }
3009 }
3010
3011 #[test]
3012 fn test_column_writer_dict_enabled_large_values_post_spill() {
3013 let value_size = 64 * 1024;
3023 let page_byte_limit = 16 * 1024;
3024 let num_rows = 32;
3025
3026 let props = WriterProperties::builder()
3027 .set_writer_version(WriterVersion::PARQUET_1_0)
3028 .set_dictionary_enabled(true)
3029 .set_dictionary_page_size_limit(1024)
3032 .set_data_page_size_limit(page_byte_limit)
3033 .set_write_batch_size(4)
3038 .build();
3039
3040 let data: Vec<_> = (0..num_rows)
3041 .map(|i| ByteArray::from(vec![i as u8; value_size]))
3042 .collect();
3043 let pages = write_and_collect_pages::<ByteArrayType>(props, 0, 0, &data, None, None);
3044 let data_pages: Vec<_> = pages.data_pages.iter().map(|(size, _)| *size).collect();
3045
3046 assert!(
3050 data_pages.len() >= num_rows / 2,
3051 "expected >= {} data pages after dict spill, got {} ({data_pages:?})",
3052 num_rows / 2,
3053 data_pages.len(),
3054 );
3055 for size in &data_pages {
3056 assert!(
3057 *size <= value_size + 64,
3058 "page size {size} exceeds one-value bound; pages {data_pages:?}"
3059 );
3060 }
3061 }
3062
3063 #[test]
3064 fn test_column_writer_caps_dictionary_page_size() {
3065 let value_size = 8 * 1024;
3073 let dict_page_limit = 64 * 1024;
3074 let num_rows = 2048;
3075
3076 let props = WriterProperties::builder()
3077 .set_writer_version(WriterVersion::PARQUET_1_0)
3078 .set_dictionary_enabled(true)
3079 .set_dictionary_page_size_limit(dict_page_limit)
3080 .build();
3081
3082 let data: Vec<_> = (0..num_rows)
3083 .map(|i| {
3084 let mut v = vec![0u8; value_size];
3086 v[..8].copy_from_slice(&(i as u64).to_le_bytes());
3087 ByteArray::from(v)
3088 })
3089 .collect();
3090 let pages = write_and_collect_pages::<ByteArrayType>(props, 0, 0, &data, None, None);
3091 let dict_page_size = pages.dict_page_size;
3092
3093 assert!(
3094 dict_page_size > 0,
3095 "expected the column to dictionary-encode"
3096 );
3097 assert!(
3101 dict_page_size <= 3 * dict_page_limit,
3102 "dictionary page {dict_page_size} exceeds 3x the {dict_page_limit} limit",
3103 );
3104 }
3105
3106 #[test]
3107 fn test_column_writer_caps_page_size_for_fixed_len_byte_array() {
3108 let page_byte_limit = 4;
3114 let num_values = 128;
3115
3116 let props = WriterProperties::builder()
3117 .set_writer_version(WriterVersion::PARQUET_1_0)
3118 .set_dictionary_enabled(false)
3119 .set_encoding(Encoding::PLAIN)
3120 .set_data_page_size_limit(page_byte_limit)
3121 .build();
3122
3123 let data: Vec<_> = (0..num_values)
3124 .map(|i| {
3125 let mut fla = FixedLenByteArray::default();
3126 fla.set_data(Bytes::from(vec![i as u8]));
3127 fla
3128 })
3129 .collect();
3130 let pages =
3131 write_and_collect_pages::<FixedLenByteArrayType>(props, 0, 0, &data, None, None);
3132 let data_pages: Vec<_> = pages.data_pages.iter().map(|(size, _)| *size).collect();
3133
3134 assert!(
3137 data_pages.len() >= num_values / 8,
3138 "expected pages capped by byte budget, got {data_pages:?}"
3139 );
3140 for size in &data_pages {
3141 assert!(
3142 *size <= page_byte_limit * 4,
3143 "page size {size} larger than expected; pages {data_pages:?}"
3144 );
3145 }
3146 }
3147
3148 #[test]
3149 fn test_bool_statistics() {
3150 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
3151 assert!(!stats.is_min_max_backwards_compatible());
3154 if let Statistics::Boolean(stats) = stats {
3155 assert_eq!(stats.min_opt().unwrap(), &false);
3156 assert_eq!(stats.max_opt().unwrap(), &true);
3157 } else {
3158 panic!("expecting Statistics::Boolean, got {stats:?}");
3159 }
3160 }
3161
3162 #[test]
3163 fn test_int32_statistics() {
3164 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
3165 assert!(stats.is_min_max_backwards_compatible());
3166 if let Statistics::Int32(stats) = stats {
3167 assert_eq!(stats.min_opt().unwrap(), &-2);
3168 assert_eq!(stats.max_opt().unwrap(), &3);
3169 } else {
3170 panic!("expecting Statistics::Int32, got {stats:?}");
3171 }
3172 }
3173
3174 #[test]
3175 fn test_int64_statistics() {
3176 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
3177 assert!(stats.is_min_max_backwards_compatible());
3178 if let Statistics::Int64(stats) = stats {
3179 assert_eq!(stats.min_opt().unwrap(), &-2);
3180 assert_eq!(stats.max_opt().unwrap(), &3);
3181 } else {
3182 panic!("expecting Statistics::Int64, got {stats:?}");
3183 }
3184 }
3185
3186 #[test]
3187 fn test_int96_statistics() {
3188 let input = vec![
3189 Int96::from(vec![1, 20, 30]),
3190 Int96::from(vec![3, 20, 10]),
3191 Int96::from(vec![0, 20, 30]),
3192 Int96::from(vec![2, 20, 30]),
3193 ]
3194 .into_iter()
3195 .collect::<Vec<Int96>>();
3196
3197 let stats = statistics_roundtrip::<Int96Type>(&input);
3198 assert!(!stats.is_min_max_backwards_compatible());
3199 if let Statistics::Int96(stats) = stats {
3200 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
3201 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
3202 } else {
3203 panic!("expecting Statistics::Int96, got {stats:?}");
3204 }
3205 }
3206
3207 #[test]
3208 fn test_float_statistics() {
3209 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
3210 assert!(stats.is_min_max_backwards_compatible());
3211 if let Statistics::Float(stats) = stats {
3212 assert_eq!(stats.min_opt().unwrap(), &-2.0);
3213 assert_eq!(stats.max_opt().unwrap(), &3.0);
3214 } else {
3215 panic!("expecting Statistics::Float, got {stats:?}");
3216 }
3217 }
3218
3219 #[test]
3220 fn test_double_statistics() {
3221 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
3222 assert!(stats.is_min_max_backwards_compatible());
3223 if let Statistics::Double(stats) = stats {
3224 assert_eq!(stats.min_opt().unwrap(), &-2.0);
3225 assert_eq!(stats.max_opt().unwrap(), &3.0);
3226 } else {
3227 panic!("expecting Statistics::Double, got {stats:?}");
3228 }
3229 }
3230
3231 #[test]
3232 fn test_byte_array_statistics() {
3233 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
3234 .iter()
3235 .map(|&s| s.into())
3236 .collect::<Vec<_>>();
3237
3238 let stats = statistics_roundtrip::<ByteArrayType>(&input);
3239 assert!(!stats.is_min_max_backwards_compatible());
3240 if let Statistics::ByteArray(stats) = stats {
3241 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
3242 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
3243 } else {
3244 panic!("expecting Statistics::ByteArray, got {stats:?}");
3245 }
3246 }
3247
3248 #[test]
3249 fn test_fixed_len_byte_array_statistics() {
3250 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
3251 .iter()
3252 .map(|&s| ByteArray::from(s).into())
3253 .collect::<Vec<_>>();
3254
3255 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
3256 assert!(!stats.is_min_max_backwards_compatible());
3257 if let Statistics::FixedLenByteArray(stats) = stats {
3258 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
3259 assert_eq!(stats.min_opt().unwrap(), &expected_min);
3260 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
3261 assert_eq!(stats.max_opt().unwrap(), &expected_max);
3262 } else {
3263 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
3264 }
3265 }
3266
3267 #[test]
3268 fn test_column_writer_check_float16_min_max() {
3269 let input = [
3270 -f16::ONE,
3271 f16::from_f32(3.0),
3272 -f16::from_f32(2.0),
3273 f16::from_f32(2.0),
3274 ]
3275 .into_iter()
3276 .map(|s| ByteArray::from(s).into())
3277 .collect::<Vec<_>>();
3278
3279 let stats = float16_statistics_roundtrip(&input);
3280 assert!(stats.is_min_max_backwards_compatible());
3281 assert_eq!(
3282 stats.min_opt().unwrap(),
3283 &ByteArray::from(-f16::from_f32(2.0))
3284 );
3285 assert_eq!(
3286 stats.max_opt().unwrap(),
3287 &ByteArray::from(f16::from_f32(3.0))
3288 );
3289 }
3290
3291 #[test]
3292 fn test_column_writer_check_float16_nan_middle() {
3293 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
3294 .into_iter()
3295 .map(|s| ByteArray::from(s).into())
3296 .collect::<Vec<_>>();
3297
3298 let stats = float16_statistics_roundtrip(&input);
3299 assert!(stats.is_min_max_backwards_compatible());
3300 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
3301 assert_eq!(
3302 stats.max_opt().unwrap(),
3303 &ByteArray::from(f16::ONE + f16::ONE)
3304 );
3305 }
3306
3307 #[test]
3308 fn test_float16_statistics_nan_middle() {
3309 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
3310 .into_iter()
3311 .map(|s| ByteArray::from(s).into())
3312 .collect::<Vec<_>>();
3313
3314 let stats = float16_statistics_roundtrip(&input);
3315 assert!(stats.is_min_max_backwards_compatible());
3316 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
3317 assert_eq!(
3318 stats.max_opt().unwrap(),
3319 &ByteArray::from(f16::ONE + f16::ONE)
3320 );
3321 }
3322
3323 #[test]
3324 fn test_float16_statistics_nan_start() {
3325 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
3326 .into_iter()
3327 .map(|s| ByteArray::from(s).into())
3328 .collect::<Vec<_>>();
3329
3330 let stats = float16_statistics_roundtrip(&input);
3331 assert!(stats.is_min_max_backwards_compatible());
3332 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
3333 assert_eq!(
3334 stats.max_opt().unwrap(),
3335 &ByteArray::from(f16::ONE + f16::ONE)
3336 );
3337 }
3338
3339 #[test]
3340 fn test_float16_statistics_nan_only() {
3341 let input = [f16::NAN, f16::NAN]
3342 .into_iter()
3343 .map(|s| ByteArray::from(s).into())
3344 .collect::<Vec<_>>();
3345
3346 let stats = float16_statistics_roundtrip(&input);
3347 assert!(stats.min_bytes_opt().is_none());
3348 assert!(stats.max_bytes_opt().is_none());
3349 assert!(stats.is_min_max_backwards_compatible());
3350 }
3351
3352 #[test]
3353 fn test_float16_statistics_zero_only() {
3354 let input = [f16::ZERO]
3355 .into_iter()
3356 .map(|s| ByteArray::from(s).into())
3357 .collect::<Vec<_>>();
3358
3359 let stats = float16_statistics_roundtrip(&input);
3360 assert!(stats.is_min_max_backwards_compatible());
3361 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
3362 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
3363 }
3364
3365 #[test]
3366 fn test_float16_statistics_neg_zero_only() {
3367 let input = [f16::NEG_ZERO]
3368 .into_iter()
3369 .map(|s| ByteArray::from(s).into())
3370 .collect::<Vec<_>>();
3371
3372 let stats = float16_statistics_roundtrip(&input);
3373 assert!(stats.is_min_max_backwards_compatible());
3374 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
3375 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
3376 }
3377
3378 #[test]
3379 fn test_float16_statistics_zero_min() {
3380 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
3381 .into_iter()
3382 .map(|s| ByteArray::from(s).into())
3383 .collect::<Vec<_>>();
3384
3385 let stats = float16_statistics_roundtrip(&input);
3386 assert!(stats.is_min_max_backwards_compatible());
3387 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
3388 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
3389 }
3390
3391 #[test]
3392 fn test_float16_statistics_neg_zero_max() {
3393 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
3394 .into_iter()
3395 .map(|s| ByteArray::from(s).into())
3396 .collect::<Vec<_>>();
3397
3398 let stats = float16_statistics_roundtrip(&input);
3399 assert!(stats.is_min_max_backwards_compatible());
3400 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
3401 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
3402 }
3403
3404 #[test]
3405 fn test_float_statistics_nan_middle() {
3406 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
3407 assert!(stats.is_min_max_backwards_compatible());
3408 if let Statistics::Float(stats) = stats {
3409 assert_eq!(stats.min_opt().unwrap(), &1.0);
3410 assert_eq!(stats.max_opt().unwrap(), &2.0);
3411 } else {
3412 panic!("expecting Statistics::Float");
3413 }
3414 }
3415
3416 #[test]
3417 fn test_float_statistics_nan_start() {
3418 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
3419 assert!(stats.is_min_max_backwards_compatible());
3420 if let Statistics::Float(stats) = stats {
3421 assert_eq!(stats.min_opt().unwrap(), &1.0);
3422 assert_eq!(stats.max_opt().unwrap(), &2.0);
3423 } else {
3424 panic!("expecting Statistics::Float");
3425 }
3426 }
3427
3428 #[test]
3429 fn test_float_statistics_nan_only() {
3430 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
3431 assert!(stats.min_bytes_opt().is_none());
3432 assert!(stats.max_bytes_opt().is_none());
3433 assert!(stats.is_min_max_backwards_compatible());
3434 assert!(matches!(stats, Statistics::Float(_)));
3435 }
3436
3437 #[test]
3438 fn test_float_statistics_zero_only() {
3439 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
3440 assert!(stats.is_min_max_backwards_compatible());
3441 if let Statistics::Float(stats) = stats {
3442 assert_eq!(stats.min_opt().unwrap(), &-0.0);
3443 assert!(stats.min_opt().unwrap().is_sign_negative());
3444 assert_eq!(stats.max_opt().unwrap(), &0.0);
3445 assert!(stats.max_opt().unwrap().is_sign_positive());
3446 } else {
3447 panic!("expecting Statistics::Float");
3448 }
3449 }
3450
3451 #[test]
3452 fn test_float_statistics_neg_zero_only() {
3453 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
3454 assert!(stats.is_min_max_backwards_compatible());
3455 if let Statistics::Float(stats) = stats {
3456 assert_eq!(stats.min_opt().unwrap(), &-0.0);
3457 assert!(stats.min_opt().unwrap().is_sign_negative());
3458 assert_eq!(stats.max_opt().unwrap(), &0.0);
3459 assert!(stats.max_opt().unwrap().is_sign_positive());
3460 } else {
3461 panic!("expecting Statistics::Float");
3462 }
3463 }
3464
3465 #[test]
3466 fn test_float_statistics_zero_min() {
3467 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
3468 assert!(stats.is_min_max_backwards_compatible());
3469 if let Statistics::Float(stats) = stats {
3470 assert_eq!(stats.min_opt().unwrap(), &-0.0);
3471 assert!(stats.min_opt().unwrap().is_sign_negative());
3472 assert_eq!(stats.max_opt().unwrap(), &2.0);
3473 } else {
3474 panic!("expecting Statistics::Float");
3475 }
3476 }
3477
3478 #[test]
3479 fn test_float_statistics_neg_zero_max() {
3480 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
3481 assert!(stats.is_min_max_backwards_compatible());
3482 if let Statistics::Float(stats) = stats {
3483 assert_eq!(stats.min_opt().unwrap(), &-2.0);
3484 assert_eq!(stats.max_opt().unwrap(), &0.0);
3485 assert!(stats.max_opt().unwrap().is_sign_positive());
3486 } else {
3487 panic!("expecting Statistics::Float");
3488 }
3489 }
3490
3491 #[test]
3492 fn test_double_statistics_nan_middle() {
3493 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
3494 assert!(stats.is_min_max_backwards_compatible());
3495 if let Statistics::Double(stats) = stats {
3496 assert_eq!(stats.min_opt().unwrap(), &1.0);
3497 assert_eq!(stats.max_opt().unwrap(), &2.0);
3498 } else {
3499 panic!("expecting Statistics::Double");
3500 }
3501 }
3502
3503 #[test]
3504 fn test_double_statistics_nan_start() {
3505 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
3506 assert!(stats.is_min_max_backwards_compatible());
3507 if let Statistics::Double(stats) = stats {
3508 assert_eq!(stats.min_opt().unwrap(), &1.0);
3509 assert_eq!(stats.max_opt().unwrap(), &2.0);
3510 } else {
3511 panic!("expecting Statistics::Double");
3512 }
3513 }
3514
3515 #[test]
3516 fn test_double_statistics_nan_only() {
3517 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
3518 assert!(stats.min_bytes_opt().is_none());
3519 assert!(stats.max_bytes_opt().is_none());
3520 assert!(matches!(stats, Statistics::Double(_)));
3521 assert!(stats.is_min_max_backwards_compatible());
3522 }
3523
3524 #[test]
3525 fn test_double_statistics_zero_only() {
3526 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
3527 assert!(stats.is_min_max_backwards_compatible());
3528 if let Statistics::Double(stats) = stats {
3529 assert_eq!(stats.min_opt().unwrap(), &-0.0);
3530 assert!(stats.min_opt().unwrap().is_sign_negative());
3531 assert_eq!(stats.max_opt().unwrap(), &0.0);
3532 assert!(stats.max_opt().unwrap().is_sign_positive());
3533 } else {
3534 panic!("expecting Statistics::Double");
3535 }
3536 }
3537
3538 #[test]
3539 fn test_double_statistics_neg_zero_only() {
3540 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
3541 assert!(stats.is_min_max_backwards_compatible());
3542 if let Statistics::Double(stats) = stats {
3543 assert_eq!(stats.min_opt().unwrap(), &-0.0);
3544 assert!(stats.min_opt().unwrap().is_sign_negative());
3545 assert_eq!(stats.max_opt().unwrap(), &0.0);
3546 assert!(stats.max_opt().unwrap().is_sign_positive());
3547 } else {
3548 panic!("expecting Statistics::Double");
3549 }
3550 }
3551
3552 #[test]
3553 fn test_double_statistics_zero_min() {
3554 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
3555 assert!(stats.is_min_max_backwards_compatible());
3556 if let Statistics::Double(stats) = stats {
3557 assert_eq!(stats.min_opt().unwrap(), &-0.0);
3558 assert!(stats.min_opt().unwrap().is_sign_negative());
3559 assert_eq!(stats.max_opt().unwrap(), &2.0);
3560 } else {
3561 panic!("expecting Statistics::Double");
3562 }
3563 }
3564
3565 #[test]
3566 fn test_double_statistics_neg_zero_max() {
3567 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
3568 assert!(stats.is_min_max_backwards_compatible());
3569 if let Statistics::Double(stats) = stats {
3570 assert_eq!(stats.min_opt().unwrap(), &-2.0);
3571 assert_eq!(stats.max_opt().unwrap(), &0.0);
3572 assert!(stats.max_opt().unwrap().is_sign_positive());
3573 } else {
3574 panic!("expecting Statistics::Double");
3575 }
3576 }
3577
3578 #[test]
3579 fn test_compare_greater_byte_array_decimals() {
3580 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
3581 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
3582 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
3583 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
3584 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
3585 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
3586 assert!(!compare_greater_byte_array_decimals(
3587 &[0u8, 1u8,],
3588 &[1u8, 0u8,],
3589 ),);
3590 assert!(!compare_greater_byte_array_decimals(
3591 &[255u8, 35u8, 0u8, 0u8,],
3592 &[0u8,],
3593 ),);
3594 assert!(compare_greater_byte_array_decimals(
3595 &[0u8,],
3596 &[255u8, 35u8, 0u8, 0u8,],
3597 ),);
3598 }
3599
3600 #[test]
3601 fn test_column_index_with_null_pages() {
3602 let page_writer = get_test_page_writer();
3604 let props = Default::default();
3605 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
3606 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
3607
3608 let r = writer.close().unwrap();
3609 assert!(r.column_index.is_some());
3610 let col_idx = r.column_index.unwrap();
3611 let col_idx = match col_idx {
3612 ColumnIndexMetaData::INT32(col_idx) => col_idx,
3613 _ => panic!("wrong stats type"),
3614 };
3615 assert!(col_idx.is_null_page(0));
3617 assert!(col_idx.min_value(0).is_none());
3619 assert!(col_idx.max_value(0).is_none());
3620 assert!(col_idx.null_count(0).is_some());
3622 assert_eq!(col_idx.null_count(0), Some(4));
3623 assert!(col_idx.repetition_level_histogram(0).is_none());
3625 assert!(col_idx.definition_level_histogram(0).is_some());
3627 assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
3628 }
3629
3630 #[test]
3631 fn test_column_offset_index_metadata() {
3632 let page_writer = get_test_page_writer();
3635 let props = Default::default();
3636 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3637 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3638 writer.flush_data_pages().unwrap();
3640 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
3642
3643 let r = writer.close().unwrap();
3644 let column_index = r.column_index.unwrap();
3645 let offset_index = r.offset_index.unwrap();
3646
3647 assert_eq!(8, r.rows_written);
3648
3649 let column_index = match column_index {
3651 ColumnIndexMetaData::INT32(column_index) => column_index,
3652 _ => panic!("wrong stats type"),
3653 };
3654 assert_eq!(2, column_index.num_pages());
3655 assert_eq!(2, offset_index.page_locations.len());
3656 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
3657 for idx in 0..2 {
3658 assert!(!column_index.is_null_page(idx));
3659 assert_eq!(0, column_index.null_count(0).unwrap());
3660 }
3661
3662 if let Some(stats) = r.metadata.statistics() {
3663 assert_eq!(stats.null_count_opt(), Some(0));
3664 assert_eq!(stats.distinct_count_opt(), None);
3665 if let Statistics::Int32(stats) = stats {
3666 assert_eq!(stats.min_opt(), column_index.min_value(1));
3670 assert_eq!(stats.max_opt(), column_index.max_value(1));
3671 } else {
3672 panic!("expecting Statistics::Int32");
3673 }
3674 } else {
3675 panic!("metadata missing statistics");
3676 }
3677
3678 assert_eq!(0, offset_index.page_locations[0].first_row_index);
3680 assert_eq!(4, offset_index.page_locations[1].first_row_index);
3681 }
3682
3683 #[test]
3685 fn test_column_offset_index_metadata_truncating() {
3686 let page_writer = get_test_page_writer();
3689 let props = WriterProperties::builder()
3690 .set_statistics_truncate_length(None) .build()
3692 .into();
3693 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3694
3695 let mut data = vec![FixedLenByteArray::default(); 3];
3696 data[0].set_data(Bytes::from(vec![97_u8; 200]));
3698 data[1].set_data(Bytes::from(vec![112_u8; 200]));
3700 data[2].set_data(Bytes::from(vec![98_u8; 200]));
3701
3702 writer.write_batch(&data, None, None).unwrap();
3703
3704 writer.flush_data_pages().unwrap();
3705
3706 let r = writer.close().unwrap();
3707 let column_index = r.column_index.unwrap();
3708 let offset_index = r.offset_index.unwrap();
3709
3710 let column_index = match column_index {
3711 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3712 _ => panic!("wrong stats type"),
3713 };
3714
3715 assert_eq!(3, r.rows_written);
3716
3717 assert_eq!(1, column_index.num_pages());
3719 assert_eq!(1, offset_index.page_locations.len());
3720 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3721 assert!(!column_index.is_null_page(0));
3722 assert_eq!(Some(0), column_index.null_count(0));
3723
3724 if let Some(stats) = r.metadata.statistics() {
3725 assert_eq!(stats.null_count_opt(), Some(0));
3726 assert_eq!(stats.distinct_count_opt(), None);
3727 if let Statistics::FixedLenByteArray(stats) = stats {
3728 let column_index_min_value = column_index.min_value(0).unwrap();
3729 let column_index_max_value = column_index.max_value(0).unwrap();
3730
3731 assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value);
3733 assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value);
3734
3735 assert_eq!(
3736 column_index_min_value.len(),
3737 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3738 );
3739 assert_eq!(column_index_min_value, &[97_u8; 64]);
3740 assert_eq!(
3741 column_index_max_value.len(),
3742 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3743 );
3744
3745 assert_eq!(
3747 *column_index_max_value.last().unwrap(),
3748 *column_index_max_value.first().unwrap() + 1
3749 );
3750 } else {
3751 panic!("expecting Statistics::FixedLenByteArray");
3752 }
3753 } else {
3754 panic!("metadata missing statistics");
3755 }
3756 }
3757
3758 #[test]
3759 fn test_column_offset_index_truncating_spec_example() {
3760 let page_writer = get_test_page_writer();
3763
3764 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3766 let props = Arc::new(builder.build());
3767 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3768
3769 let mut data = vec![FixedLenByteArray::default(); 1];
3770 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3772
3773 writer.write_batch(&data, None, None).unwrap();
3774
3775 writer.flush_data_pages().unwrap();
3776
3777 let r = writer.close().unwrap();
3778 let column_index = r.column_index.unwrap();
3779 let offset_index = r.offset_index.unwrap();
3780
3781 let column_index = match column_index {
3782 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3783 _ => panic!("wrong stats type"),
3784 };
3785
3786 assert_eq!(1, r.rows_written);
3787
3788 assert_eq!(1, column_index.num_pages());
3790 assert_eq!(1, offset_index.page_locations.len());
3791 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3792 assert!(!column_index.is_null_page(0));
3793 assert_eq!(Some(0), column_index.null_count(0));
3794
3795 if let Some(stats) = r.metadata.statistics() {
3796 assert_eq!(stats.null_count_opt(), Some(0));
3797 assert_eq!(stats.distinct_count_opt(), None);
3798 if let Statistics::FixedLenByteArray(_stats) = stats {
3799 let column_index_min_value = column_index.min_value(0).unwrap();
3800 let column_index_max_value = column_index.max_value(0).unwrap();
3801
3802 assert_eq!(column_index_min_value.len(), 1);
3803 assert_eq!(column_index_max_value.len(), 1);
3804
3805 assert_eq!("B".as_bytes(), column_index_min_value);
3806 assert_eq!("C".as_bytes(), column_index_max_value);
3807
3808 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3809 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3810 } else {
3811 panic!("expecting Statistics::FixedLenByteArray");
3812 }
3813 } else {
3814 panic!("metadata missing statistics");
3815 }
3816 }
3817
3818 #[test]
3819 fn test_float16_min_max_no_truncation() {
3820 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3822 let props = Arc::new(builder.build());
3823 let page_writer = get_test_page_writer();
3824 let mut writer = get_test_float16_column_writer(page_writer, props);
3825
3826 let expected_value = f16::PI.to_le_bytes().to_vec();
3827 let data = vec![ByteArray::from(expected_value.clone()).into()];
3828 writer.write_batch(&data, None, None).unwrap();
3829 writer.flush_data_pages().unwrap();
3830
3831 let r = writer.close().unwrap();
3832
3833 let column_index = r.column_index.unwrap();
3836 let column_index = match column_index {
3837 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3838 _ => panic!("wrong stats type"),
3839 };
3840 let column_index_min_bytes = column_index.min_value(0).unwrap();
3841 let column_index_max_bytes = column_index.max_value(0).unwrap();
3842 assert_eq!(expected_value, column_index_min_bytes);
3843 assert_eq!(expected_value, column_index_max_bytes);
3844
3845 let stats = r.metadata.statistics().unwrap();
3847 if let Statistics::FixedLenByteArray(stats) = stats {
3848 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3849 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3850 assert_eq!(expected_value, stats_min_bytes);
3851 assert_eq!(expected_value, stats_max_bytes);
3852 } else {
3853 panic!("expecting Statistics::FixedLenByteArray");
3854 }
3855 }
3856
3857 #[test]
3858 fn test_decimal_min_max_no_truncation() {
3859 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3861 let props = Arc::new(builder.build());
3862 let page_writer = get_test_page_writer();
3863 let mut writer =
3864 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3865
3866 let expected_value = vec![
3867 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3868 231u8, 90u8, 0u8, 0u8,
3869 ];
3870 let data = vec![ByteArray::from(expected_value.clone()).into()];
3871 writer.write_batch(&data, None, None).unwrap();
3872 writer.flush_data_pages().unwrap();
3873
3874 let r = writer.close().unwrap();
3875
3876 let column_index = r.column_index.unwrap();
3879 let column_index = match column_index {
3880 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3881 _ => panic!("wrong stats type"),
3882 };
3883 let column_index_min_bytes = column_index.min_value(0).unwrap();
3884 let column_index_max_bytes = column_index.max_value(0).unwrap();
3885 assert_eq!(expected_value, column_index_min_bytes);
3886 assert_eq!(expected_value, column_index_max_bytes);
3887
3888 let stats = r.metadata.statistics().unwrap();
3890 if let Statistics::FixedLenByteArray(stats) = stats {
3891 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3892 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3893 assert_eq!(expected_value, stats_min_bytes);
3894 assert_eq!(expected_value, stats_max_bytes);
3895 } else {
3896 panic!("expecting Statistics::FixedLenByteArray");
3897 }
3898 }
3899
3900 #[test]
3901 fn test_statistics_truncating_byte_array_default() {
3902 let page_writer = get_test_page_writer();
3903
3904 let props = WriterProperties::builder().build().into();
3906 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3907
3908 let mut data = vec![ByteArray::default(); 1];
3909 data[0].set_data(Bytes::from(String::from(
3910 "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3911 )));
3912 writer.write_batch(&data, None, None).unwrap();
3913 writer.flush_data_pages().unwrap();
3914
3915 let r = writer.close().unwrap();
3916
3917 assert_eq!(1, r.rows_written);
3918
3919 let stats = r.metadata.statistics().expect("statistics");
3920 if let Statistics::ByteArray(_stats) = stats {
3921 let min_value = _stats.min_opt().unwrap();
3922 let max_value = _stats.max_opt().unwrap();
3923
3924 assert!(!_stats.min_is_exact());
3925 assert!(!_stats.max_is_exact());
3926
3927 let expected_len = 64;
3928 assert_eq!(min_value.len(), expected_len);
3929 assert_eq!(max_value.len(), expected_len);
3930
3931 let expected_min =
3932 "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3933 assert_eq!(expected_min, min_value.as_bytes());
3934 let expected_max =
3936 "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3937 assert_eq!(expected_max, max_value.as_bytes());
3938 } else {
3939 panic!("expecting Statistics::ByteArray");
3940 }
3941 }
3942
3943 #[test]
3944 fn test_statistics_truncating_byte_array() {
3945 let page_writer = get_test_page_writer();
3946
3947 const TEST_TRUNCATE_LENGTH: usize = 1;
3948
3949 let builder =
3951 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3952 let props = Arc::new(builder.build());
3953 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3954
3955 let mut data = vec![ByteArray::default(); 1];
3956 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3958
3959 writer.write_batch(&data, None, None).unwrap();
3960
3961 writer.flush_data_pages().unwrap();
3962
3963 let r = writer.close().unwrap();
3964
3965 assert_eq!(1, r.rows_written);
3966
3967 let stats = r.metadata.statistics().expect("statistics");
3968 assert_eq!(stats.null_count_opt(), Some(0));
3969 assert_eq!(stats.distinct_count_opt(), None);
3970 if let Statistics::ByteArray(_stats) = stats {
3971 let min_value = _stats.min_opt().unwrap();
3972 let max_value = _stats.max_opt().unwrap();
3973
3974 assert!(!_stats.min_is_exact());
3975 assert!(!_stats.max_is_exact());
3976
3977 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3978 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3979
3980 assert_eq!("B".as_bytes(), min_value.as_bytes());
3981 assert_eq!("C".as_bytes(), max_value.as_bytes());
3982 } else {
3983 panic!("expecting Statistics::ByteArray");
3984 }
3985 }
3986
3987 #[test]
3988 fn test_statistics_truncating_fixed_len_byte_array() {
3989 let page_writer = get_test_page_writer();
3990
3991 const TEST_TRUNCATE_LENGTH: usize = 1;
3992
3993 let builder =
3995 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3996 let props = Arc::new(builder.build());
3997 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3998
3999 let mut data = vec![FixedLenByteArray::default(); 1];
4000
4001 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
4002 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
4003
4004 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
4006 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
4007
4008 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
4010
4011 writer.write_batch(&data, None, None).unwrap();
4012
4013 writer.flush_data_pages().unwrap();
4014
4015 let r = writer.close().unwrap();
4016
4017 assert_eq!(1, r.rows_written);
4018
4019 let stats = r.metadata.statistics().expect("statistics");
4020 assert_eq!(stats.null_count_opt(), Some(0));
4021 assert_eq!(stats.distinct_count_opt(), None);
4022 if let Statistics::FixedLenByteArray(_stats) = stats {
4023 let min_value = _stats.min_opt().unwrap();
4024 let max_value = _stats.max_opt().unwrap();
4025
4026 assert!(!_stats.min_is_exact());
4027 assert!(!_stats.max_is_exact());
4028
4029 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
4030 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
4031
4032 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
4033 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
4034
4035 let reconstructed_min = i128::from_be_bytes([
4036 min_value.as_bytes()[0],
4037 0,
4038 0,
4039 0,
4040 0,
4041 0,
4042 0,
4043 0,
4044 0,
4045 0,
4046 0,
4047 0,
4048 0,
4049 0,
4050 0,
4051 0,
4052 ]);
4053
4054 let reconstructed_max = i128::from_be_bytes([
4055 max_value.as_bytes()[0],
4056 0,
4057 0,
4058 0,
4059 0,
4060 0,
4061 0,
4062 0,
4063 0,
4064 0,
4065 0,
4066 0,
4067 0,
4068 0,
4069 0,
4070 0,
4071 ]);
4072
4073 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
4075 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
4076 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
4077 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
4078 } else {
4079 panic!("expecting Statistics::FixedLenByteArray");
4080 }
4081 }
4082
4083 #[test]
4084 fn test_send() {
4085 fn test<T: Send>() {}
4086 test::<ColumnWriterImpl<Int32Type>>();
4087 }
4088
4089 #[test]
4090 fn test_increment() {
4091 let v = increment(vec![0, 0, 0]).unwrap();
4092 assert_eq!(&v, &[0, 0, 1]);
4093
4094 let v = increment(vec![0, 255, 255]).unwrap();
4096 assert_eq!(&v, &[1, 0, 0]);
4097
4098 let v = increment(vec![255, 255, 255]);
4100 assert!(v.is_none());
4101 }
4102
4103 #[test]
4104 fn test_increment_utf8() {
4105 let test_inc = |o: &str, expected: &str| {
4106 if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
4107 assert_eq!(v, expected);
4109 assert!(*v > *o);
4111 let mut greater = ByteArray::new();
4113 greater.set_data(Bytes::from(v));
4114 let mut original = ByteArray::new();
4115 original.set_data(Bytes::from(o.as_bytes().to_vec()));
4116 assert!(greater > original);
4117 } else {
4118 panic!("Expected incremented UTF8 string to also be valid.");
4119 }
4120 };
4121
4122 test_inc("hello", "hellp");
4124
4125 test_inc("a\u{7f}", "b");
4127
4128 assert!(increment_utf8("\u{7f}\u{7f}").is_none());
4130
4131 test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
4133
4134 test_inc("éééé", "éééê");
4136
4137 test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
4139
4140 test_inc("a\u{7ff}", "b");
4142
4143 assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
4145
4146 test_inc("ࠀࠀ", "ࠀࠁ");
4149
4150 test_inc("a\u{ffff}", "b");
4152
4153 assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
4155
4156 test_inc("𐀀𐀀", "𐀀𐀁");
4158
4159 test_inc("a\u{10ffff}", "b");
4161
4162 assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
4164
4165 test_inc("a\u{D7FF}", "b");
4168 }
4169
4170 #[test]
4171 fn test_truncate_utf8() {
4172 let data = "❤️🧡💛💚💙💜";
4174 let r = truncate_utf8(data, data.len()).unwrap();
4175 assert_eq!(r.len(), data.len());
4176 assert_eq!(&r, data.as_bytes());
4177
4178 let r = truncate_utf8(data, 13).unwrap();
4180 assert_eq!(r.len(), 10);
4181 assert_eq!(&r, "❤️🧡".as_bytes());
4182
4183 let r = truncate_utf8("\u{0836}", 1);
4185 assert!(r.is_none());
4186
4187 let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
4190 assert_eq!(&r, "yyyyyyyz".as_bytes());
4191
4192 let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
4194 assert_eq!(&r, "ééê".as_bytes());
4195
4196 let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
4198 assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
4199
4200 let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
4202 assert!(r.is_none());
4203
4204 let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
4207 assert_eq!(&r, "ࠀࠁ".as_bytes());
4208
4209 let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
4211 assert!(r.is_none());
4212
4213 let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
4215 assert_eq!(&r, "𐀀𐀁".as_bytes());
4216
4217 let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
4219 assert!(r.is_none());
4220 }
4221
4222 #[test]
4223 fn test_byte_array_truncate_invalid_utf8_statistics() {
4226 let message_type = "
4227 message test_schema {
4228 OPTIONAL BYTE_ARRAY a (UTF8);
4229 }
4230 ";
4231 let schema = Arc::new(parse_message_type(message_type).unwrap());
4232
4233 let data = vec![ByteArray::from(vec![128u8; 32]); 7];
4235 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
4236 let file: File = tempfile::tempfile().unwrap();
4237 let props = Arc::new(
4238 WriterProperties::builder()
4239 .set_statistics_enabled(EnabledStatistics::Chunk)
4240 .set_statistics_truncate_length(Some(8))
4241 .build(),
4242 );
4243
4244 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
4245 let mut row_group_writer = writer.next_row_group().unwrap();
4246
4247 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
4248 col_writer
4249 .typed::<ByteArrayType>()
4250 .write_batch(&data, Some(&def_levels), None)
4251 .unwrap();
4252 col_writer.close().unwrap();
4253 row_group_writer.close().unwrap();
4254 let file_metadata = writer.close().unwrap();
4255 let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
4256 assert!(!stats.max_is_exact());
4257 assert_eq!(
4260 stats.max_bytes_opt().map(|v| v.to_vec()),
4261 Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
4262 );
4263 }
4264
4265 #[test]
4266 fn test_increment_max_binary_chars() {
4267 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
4268 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
4269
4270 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
4271 assert!(incremented.is_none())
4272 }
4273
4274 #[test]
4275 fn test_no_column_index_when_stats_disabled() {
4276 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
4280 let props = Arc::new(
4281 WriterProperties::builder()
4282 .set_statistics_enabled(EnabledStatistics::None)
4283 .build(),
4284 );
4285 let column_writer = get_column_writer(descr, props, get_test_page_writer());
4286 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
4287
4288 let data = Vec::new();
4289 let def_levels = vec![0; 10];
4290 writer.write_batch(&data, Some(&def_levels), None).unwrap();
4291 writer.flush_data_pages().unwrap();
4292
4293 let column_close_result = writer.close().unwrap();
4294 assert!(column_close_result.offset_index.is_some());
4295 assert!(column_close_result.column_index.is_none());
4296 }
4297
4298 #[test]
4299 fn test_no_offset_index_when_disabled() {
4300 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
4302 let props = Arc::new(
4303 WriterProperties::builder()
4304 .set_statistics_enabled(EnabledStatistics::None)
4305 .set_offset_index_disabled(true)
4306 .build(),
4307 );
4308 let column_writer = get_column_writer(descr, props, get_test_page_writer());
4309 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
4310
4311 let data = Vec::new();
4312 let def_levels = vec![0; 10];
4313 writer.write_batch(&data, Some(&def_levels), None).unwrap();
4314 writer.flush_data_pages().unwrap();
4315
4316 let column_close_result = writer.close().unwrap();
4317 assert!(column_close_result.offset_index.is_none());
4318 assert!(column_close_result.column_index.is_none());
4319 }
4320
4321 #[test]
4322 fn test_offset_index_overridden() {
4323 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
4325 let props = Arc::new(
4326 WriterProperties::builder()
4327 .set_statistics_enabled(EnabledStatistics::Page)
4328 .set_offset_index_disabled(true)
4329 .build(),
4330 );
4331 let column_writer = get_column_writer(descr, props, get_test_page_writer());
4332 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
4333
4334 let data = Vec::new();
4335 let def_levels = vec![0; 10];
4336 writer.write_batch(&data, Some(&def_levels), None).unwrap();
4337 writer.flush_data_pages().unwrap();
4338
4339 let column_close_result = writer.close().unwrap();
4340 assert!(column_close_result.offset_index.is_some());
4341 assert!(column_close_result.column_index.is_some());
4342 }
4343
4344 #[test]
4345 fn test_boundary_order() -> Result<()> {
4346 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
4347 let column_close_result = write_multiple_pages::<Int32Type>(
4349 &descr,
4350 &[
4351 &[Some(-10), Some(10)],
4352 &[Some(-5), Some(11)],
4353 &[None],
4354 &[Some(-5), Some(11)],
4355 ],
4356 )?;
4357 let boundary_order = column_close_result
4358 .column_index
4359 .unwrap()
4360 .get_boundary_order();
4361 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
4362
4363 let column_close_result = write_multiple_pages::<Int32Type>(
4365 &descr,
4366 &[
4367 &[Some(10), Some(11)],
4368 &[Some(5), Some(11)],
4369 &[None],
4370 &[Some(-5), Some(0)],
4371 ],
4372 )?;
4373 let boundary_order = column_close_result
4374 .column_index
4375 .unwrap()
4376 .get_boundary_order();
4377 assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
4378
4379 let column_close_result = write_multiple_pages::<Int32Type>(
4381 &descr,
4382 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
4383 )?;
4384 let boundary_order = column_close_result
4385 .column_index
4386 .unwrap()
4387 .get_boundary_order();
4388 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
4389
4390 let column_close_result =
4392 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
4393 let boundary_order = column_close_result
4394 .column_index
4395 .unwrap()
4396 .get_boundary_order();
4397 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
4398
4399 let column_close_result =
4401 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
4402 let boundary_order = column_close_result
4403 .column_index
4404 .unwrap()
4405 .get_boundary_order();
4406 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
4407
4408 let column_close_result =
4410 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
4411 let boundary_order = column_close_result
4412 .column_index
4413 .unwrap()
4414 .get_boundary_order();
4415 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
4416
4417 let column_close_result = write_multiple_pages::<Int32Type>(
4419 &descr,
4420 &[
4421 &[Some(10), Some(11)],
4422 &[Some(11), Some(16)],
4423 &[None],
4424 &[Some(-5), Some(0)],
4425 ],
4426 )?;
4427 let boundary_order = column_close_result
4428 .column_index
4429 .unwrap()
4430 .get_boundary_order();
4431 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
4432
4433 let column_close_result = write_multiple_pages::<Int32Type>(
4435 &descr,
4436 &[
4437 &[Some(1), Some(9)],
4438 &[Some(2), Some(8)],
4439 &[None],
4440 &[Some(3), Some(7)],
4441 ],
4442 )?;
4443 let boundary_order = column_close_result
4444 .column_index
4445 .unwrap()
4446 .get_boundary_order();
4447 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
4448
4449 Ok(())
4450 }
4451
4452 #[test]
4453 fn test_boundary_order_logical_type() -> Result<()> {
4454 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
4457 let fba_descr = {
4458 let tpe = SchemaType::primitive_type_builder(
4459 "col",
4460 FixedLenByteArrayType::get_physical_type(),
4461 )
4462 .with_length(2)
4463 .build()?;
4464 Arc::new(ColumnDescriptor::new(
4465 Arc::new(tpe),
4466 1,
4467 0,
4468 ColumnPath::from("col"),
4469 ))
4470 };
4471
4472 let values: &[&[Option<FixedLenByteArray>]] = &[
4473 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
4474 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
4475 &[Some(FixedLenByteArray::from(ByteArray::from(
4476 f16::NEG_ZERO,
4477 )))],
4478 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
4479 ];
4480
4481 let column_close_result =
4483 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
4484 let boundary_order = column_close_result
4485 .column_index
4486 .unwrap()
4487 .get_boundary_order();
4488 assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
4489
4490 let column_close_result =
4492 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
4493 let boundary_order = column_close_result
4494 .column_index
4495 .unwrap()
4496 .get_boundary_order();
4497 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
4498
4499 Ok(())
4500 }
4501
4502 #[test]
4503 fn test_interval_stats_should_not_have_min_max() {
4504 let input = [
4505 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
4506 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
4507 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
4508 ]
4509 .into_iter()
4510 .map(|s| ByteArray::from(s).into())
4511 .collect::<Vec<_>>();
4512
4513 let page_writer = get_test_page_writer();
4514 let mut writer = get_test_interval_column_writer(page_writer);
4515 writer.write_batch(&input, None, None).unwrap();
4516
4517 let metadata = writer.close().unwrap().metadata;
4518 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4519 stats.clone()
4520 } else {
4521 panic!("metadata missing statistics");
4522 };
4523 assert!(stats.min_bytes_opt().is_none());
4524 assert!(stats.max_bytes_opt().is_none());
4525 }
4526
4527 #[test]
4528 #[cfg(feature = "arrow")]
4529 fn test_column_writer_get_estimated_total_bytes() {
4530 let page_writer = get_test_page_writer();
4531 let props = Default::default();
4532 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
4533 assert_eq!(writer.get_estimated_total_bytes(), 0);
4534
4535 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
4536 writer.add_data_page().unwrap();
4537 let size_with_one_page = writer.get_estimated_total_bytes();
4538 assert_eq!(size_with_one_page, 20);
4539
4540 writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
4541 writer.add_data_page().unwrap();
4542 let size_with_two_pages = writer.get_estimated_total_bytes();
4543 assert_eq!(size_with_two_pages, 20 + 21);
4545 }
4546
4547 fn write_multiple_pages<T: DataType>(
4548 column_descr: &Arc<ColumnDescriptor>,
4549 pages: &[&[Option<T::T>]],
4550 ) -> Result<ColumnCloseResult> {
4551 let column_writer = get_column_writer(
4552 column_descr.clone(),
4553 Default::default(),
4554 get_test_page_writer(),
4555 );
4556 let mut writer = get_typed_column_writer::<T>(column_writer);
4557
4558 for &page in pages {
4559 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
4560 let def_levels = page
4561 .iter()
4562 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
4563 .collect::<Vec<_>>();
4564 writer.write_batch(&values, Some(&def_levels), None)?;
4565 writer.flush_data_pages()?;
4566 }
4567
4568 writer.close()
4569 }
4570
4571 fn column_roundtrip_random<T: DataType>(
4575 props: WriterProperties,
4576 max_size: usize,
4577 min_value: T::T,
4578 max_value: T::T,
4579 max_def_level: i16,
4580 max_rep_level: i16,
4581 ) where
4582 T::T: PartialOrd + SampleUniform + Copy,
4583 {
4584 let mut num_values: usize = 0;
4585
4586 let mut buf: Vec<i16> = Vec::new();
4587 let def_levels = if max_def_level > 0 {
4588 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
4589 for &dl in &buf[..] {
4590 if dl == max_def_level {
4591 num_values += 1;
4592 }
4593 }
4594 Some(&buf[..])
4595 } else {
4596 num_values = max_size;
4597 None
4598 };
4599
4600 let mut buf: Vec<i16> = Vec::new();
4601 let rep_levels = if max_rep_level > 0 {
4602 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
4603 buf[0] = 0; Some(&buf[..])
4605 } else {
4606 None
4607 };
4608
4609 let mut values: Vec<T::T> = Vec::new();
4610 random_numbers_range(num_values, min_value, max_value, &mut values);
4611
4612 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
4613 }
4614
4615 fn column_roundtrip<T: DataType>(
4617 props: WriterProperties,
4618 values: &[T::T],
4619 def_levels: Option<&[i16]>,
4620 rep_levels: Option<&[i16]>,
4621 ) {
4622 let mut file = tempfile::tempfile().unwrap();
4623 let mut write = TrackedWrite::new(&mut file);
4624 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4625
4626 let max_def_level = match def_levels {
4627 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4628 None => 0i16,
4629 };
4630
4631 let max_rep_level = match rep_levels {
4632 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4633 None => 0i16,
4634 };
4635
4636 let mut max_batch_size = values.len();
4637 if let Some(levels) = def_levels {
4638 max_batch_size = max_batch_size.max(levels.len());
4639 }
4640 if let Some(levels) = rep_levels {
4641 max_batch_size = max_batch_size.max(levels.len());
4642 }
4643
4644 let mut writer =
4645 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4646
4647 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
4648 assert_eq!(values_written, values.len());
4649 let result = writer.close().unwrap();
4650
4651 drop(write);
4652
4653 let props = ReaderProperties::builder()
4654 .set_backward_compatible_lz4(false)
4655 .build();
4656 let page_reader = Box::new(
4657 SerializedPageReader::new_with_properties(
4658 Arc::new(file),
4659 &result.metadata,
4660 result.rows_written as usize,
4661 None,
4662 Arc::new(props),
4663 )
4664 .unwrap(),
4665 );
4666 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
4667
4668 let mut actual_values = Vec::with_capacity(max_batch_size);
4669 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
4670 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
4671
4672 let (_, values_read, levels_read) = reader
4673 .read_records(
4674 max_batch_size,
4675 actual_def_levels.as_mut(),
4676 actual_rep_levels.as_mut(),
4677 &mut actual_values,
4678 )
4679 .unwrap();
4680
4681 assert_eq!(&actual_values[..values_read], values);
4684 match actual_def_levels {
4685 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
4686 None => assert_eq!(None, def_levels),
4687 }
4688 match actual_rep_levels {
4689 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
4690 None => assert_eq!(None, rep_levels),
4691 }
4692
4693 if let Some(levels) = actual_rep_levels {
4696 let mut actual_rows_written = 0;
4697 for l in levels {
4698 if l == 0 {
4699 actual_rows_written += 1;
4700 }
4701 }
4702 assert_eq!(actual_rows_written, result.rows_written);
4703 } else if actual_def_levels.is_some() {
4704 assert_eq!(levels_read as u64, result.rows_written);
4705 } else {
4706 assert_eq!(values_read as u64, result.rows_written);
4707 }
4708 }
4709
4710 fn column_write_and_get_metadata<T: DataType>(
4713 props: WriterProperties,
4714 values: &[T::T],
4715 ) -> ColumnChunkMetaData {
4716 let page_writer = get_test_page_writer();
4717 let props = Arc::new(props);
4718 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4719 writer.write_batch(values, None, None).unwrap();
4720 writer.close().unwrap().metadata
4721 }
4722
4723 fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4725 PageEncodingStats {
4726 page_type,
4727 encoding,
4728 count,
4729 }
4730 }
4731
4732 fn check_encoding_write_support<T: DataType>(
4736 version: WriterVersion,
4737 dict_enabled: bool,
4738 data: &[T::T],
4739 dictionary_page_offset: Option<i64>,
4740 encodings: &[Encoding],
4741 page_encoding_stats: &[PageEncodingStats],
4742 ) {
4743 let props = WriterProperties::builder()
4744 .set_writer_version(version)
4745 .set_dictionary_enabled(dict_enabled)
4746 .build();
4747 let meta = column_write_and_get_metadata::<T>(props, data);
4748 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4749 assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
4750 assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4751 }
4752
4753 fn get_test_column_writer<'a, T: DataType>(
4755 page_writer: Box<dyn PageWriter + 'a>,
4756 max_def_level: i16,
4757 max_rep_level: i16,
4758 props: WriterPropertiesPtr,
4759 ) -> ColumnWriterImpl<'a, T> {
4760 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4761 let column_writer = get_column_writer(descr, props, page_writer);
4762 get_typed_column_writer::<T>(column_writer)
4763 }
4764
4765 fn get_test_column_writer_with_path<'a, T: DataType>(
4766 page_writer: Box<dyn PageWriter + 'a>,
4767 max_def_level: i16,
4768 max_rep_level: i16,
4769 props: WriterPropertiesPtr,
4770 path: ColumnPath,
4771 ) -> ColumnWriterImpl<'a, T> {
4772 let descr = Arc::new(get_test_column_descr_with_path::<T>(
4773 max_def_level,
4774 max_rep_level,
4775 path,
4776 ));
4777 let column_writer = get_column_writer(descr, props, page_writer);
4778 get_typed_column_writer::<T>(column_writer)
4779 }
4780
4781 struct CollectedPages {
4783 data_pages: Vec<(usize, u32)>,
4785 dict_page_size: usize,
4787 }
4788
4789 fn write_and_collect_pages<T: DataType>(
4794 props: WriterProperties,
4795 max_def_level: i16,
4796 max_rep_level: i16,
4797 data: &[T::T],
4798 def_levels: Option<&[i16]>,
4799 rep_levels: Option<&[i16]>,
4800 ) -> CollectedPages {
4801 let mut file = tempfile::tempfile().unwrap();
4802 let mut write = TrackedWrite::new(&mut file);
4803 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4804 let mut writer =
4805 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4806 writer.write_batch(data, def_levels, rep_levels).unwrap();
4807 let r = writer.close().unwrap();
4808 drop(write);
4809
4810 let read_props = ReaderProperties::builder()
4811 .set_backward_compatible_lz4(false)
4812 .build();
4813 let mut page_reader = Box::new(
4814 SerializedPageReader::new_with_properties(
4815 Arc::new(file),
4816 &r.metadata,
4817 r.rows_written as usize,
4818 None,
4819 Arc::new(read_props),
4820 )
4821 .unwrap(),
4822 );
4823
4824 let mut collected = CollectedPages {
4825 data_pages: Vec::new(),
4826 dict_page_size: 0,
4827 };
4828 while let Some(page) = page_reader.get_next_page().unwrap() {
4829 match page.page_type() {
4830 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
4831 collected
4832 .data_pages
4833 .push((page.buffer().len(), page.num_values()));
4834 }
4835 PageType::DICTIONARY_PAGE => {
4836 collected.dict_page_size = collected.dict_page_size.max(page.buffer().len());
4837 }
4838 _ => {}
4839 }
4840 }
4841 collected
4842 }
4843
4844 fn get_test_column_reader<T: DataType>(
4846 page_reader: Box<dyn PageReader>,
4847 max_def_level: i16,
4848 max_rep_level: i16,
4849 ) -> ColumnReaderImpl<T> {
4850 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4851 let column_reader = get_column_reader(descr, page_reader);
4852 get_typed_column_reader::<T>(column_reader)
4853 }
4854
4855 fn get_test_column_descr<T: DataType>(
4857 max_def_level: i16,
4858 max_rep_level: i16,
4859 ) -> ColumnDescriptor {
4860 let path = ColumnPath::from("col");
4861 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4862 .with_length(1)
4865 .build()
4866 .unwrap();
4867 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4868 }
4869
4870 fn get_test_column_descr_with_path<T: DataType>(
4871 max_def_level: i16,
4872 max_rep_level: i16,
4873 path: ColumnPath,
4874 ) -> ColumnDescriptor {
4875 let name = path.string();
4876 let tpe = SchemaType::primitive_type_builder(&name, T::get_physical_type())
4877 .with_length(1)
4880 .build()
4881 .unwrap();
4882 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4883 }
4884
4885 fn write_and_collect_page_values(
4886 path: ColumnPath,
4887 props: WriterPropertiesPtr,
4888 data: &[i32],
4889 ) -> Vec<u32> {
4890 let mut file = tempfile::tempfile().unwrap();
4891 let mut write = TrackedWrite::new(&mut file);
4892 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4893 let mut writer =
4894 get_test_column_writer_with_path::<Int32Type>(page_writer, 0, 0, props, path);
4895 writer.write_batch(data, None, None).unwrap();
4896 let r = writer.close().unwrap();
4897
4898 drop(write);
4899
4900 let props = ReaderProperties::builder()
4901 .set_backward_compatible_lz4(false)
4902 .build();
4903 let mut page_reader = Box::new(
4904 SerializedPageReader::new_with_properties(
4905 Arc::new(file),
4906 &r.metadata,
4907 r.rows_written as usize,
4908 None,
4909 Arc::new(props),
4910 )
4911 .unwrap(),
4912 );
4913
4914 let mut values_per_page = Vec::new();
4915 while let Some(page) = page_reader.get_next_page().unwrap() {
4916 assert_eq!(page.page_type(), PageType::DATA_PAGE);
4917 values_per_page.push(page.num_values());
4918 }
4919
4920 values_per_page
4921 }
4922
4923 fn get_test_page_writer() -> Box<dyn PageWriter> {
4925 Box::new(TestPageWriter {})
4926 }
4927
4928 struct TestPageWriter {}
4929
4930 impl PageWriter for TestPageWriter {
4931 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4932 let mut res = PageWriteSpec::new();
4933 res.page_type = page.page_type();
4934 res.uncompressed_size = page.uncompressed_size();
4935 res.compressed_size = page.compressed_size();
4936 res.num_values = page.num_values();
4937 res.offset = 0;
4938 res.bytes_written = page.data().len() as u64;
4939 Ok(res)
4940 }
4941
4942 fn close(&mut self) -> Result<()> {
4943 Ok(())
4944 }
4945 }
4946
4947 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4949 let page_writer = get_test_page_writer();
4950 let props = Default::default();
4951 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4952 writer.write_batch(values, None, None).unwrap();
4953
4954 let metadata = writer.close().unwrap().metadata;
4955 if let Some(stats) = metadata.statistics() {
4956 stats.clone()
4957 } else {
4958 panic!("metadata missing statistics");
4959 }
4960 }
4961
4962 fn get_test_decimals_column_writer<T: DataType>(
4964 page_writer: Box<dyn PageWriter>,
4965 max_def_level: i16,
4966 max_rep_level: i16,
4967 props: WriterPropertiesPtr,
4968 ) -> ColumnWriterImpl<'static, T> {
4969 let descr = Arc::new(get_test_decimals_column_descr::<T>(
4970 max_def_level,
4971 max_rep_level,
4972 ));
4973 let column_writer = get_column_writer(descr, props, page_writer);
4974 get_typed_column_writer::<T>(column_writer)
4975 }
4976
4977 fn get_test_decimals_column_descr<T: DataType>(
4979 max_def_level: i16,
4980 max_rep_level: i16,
4981 ) -> ColumnDescriptor {
4982 let path = ColumnPath::from("col");
4983 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4984 .with_length(16)
4985 .with_logical_type(Some(LogicalType::decimal(2, 3)))
4986 .with_scale(2)
4987 .with_precision(3)
4988 .build()
4989 .unwrap();
4990 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4991 }
4992
4993 fn float16_statistics_roundtrip(
4994 values: &[FixedLenByteArray],
4995 ) -> ValueStatistics<FixedLenByteArray> {
4996 let page_writer = get_test_page_writer();
4997 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4998 writer.write_batch(values, None, None).unwrap();
4999
5000 let metadata = writer.close().unwrap().metadata;
5001 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
5002 stats.clone()
5003 } else {
5004 panic!("metadata missing statistics");
5005 }
5006 }
5007
5008 fn get_test_float16_column_writer(
5009 page_writer: Box<dyn PageWriter>,
5010 props: WriterPropertiesPtr,
5011 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
5012 let descr = Arc::new(get_test_float16_column_descr(0, 0));
5013 let column_writer = get_column_writer(descr, props, page_writer);
5014 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
5015 }
5016
5017 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
5018 let path = ColumnPath::from("col");
5019 let tpe =
5020 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
5021 .with_length(2)
5022 .with_logical_type(Some(LogicalType::Float16))
5023 .build()
5024 .unwrap();
5025 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
5026 }
5027
5028 fn get_test_interval_column_writer(
5029 page_writer: Box<dyn PageWriter>,
5030 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
5031 let descr = Arc::new(get_test_interval_column_descr());
5032 let column_writer = get_column_writer(descr, Default::default(), page_writer);
5033 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
5034 }
5035
5036 fn get_test_interval_column_descr() -> ColumnDescriptor {
5037 let path = ColumnPath::from("col");
5038 let tpe =
5039 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
5040 .with_length(12)
5041 .with_converted_type(ConvertedType::INTERVAL)
5042 .build()
5043 .unwrap();
5044 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
5045 }
5046
5047 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
5049 page_writer: Box<dyn PageWriter + 'a>,
5050 max_def_level: i16,
5051 max_rep_level: i16,
5052 props: WriterPropertiesPtr,
5053 ) -> ColumnWriterImpl<'a, T> {
5054 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
5055 max_def_level,
5056 max_rep_level,
5057 ));
5058 let column_writer = get_column_writer(descr, props, page_writer);
5059 get_typed_column_writer::<T>(column_writer)
5060 }
5061
5062 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
5064 max_def_level: i16,
5065 max_rep_level: i16,
5066 ) -> ColumnDescriptor {
5067 let path = ColumnPath::from("col");
5068 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
5069 .with_converted_type(ConvertedType::UINT_32)
5070 .build()
5071 .unwrap();
5072 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
5073 }
5074
5075 #[test]
5076 fn test_page_v2_snappy_compression_fallback() {
5077 let page_writer = TestPageWriter {};
5079
5080 let props = WriterProperties::builder()
5082 .set_writer_version(WriterVersion::PARQUET_2_0)
5083 .set_dictionary_enabled(false)
5085 .set_compression(Compression::SNAPPY)
5086 .build();
5087
5088 let mut column_writer =
5089 get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
5090
5091 let values = vec![ByteArray::from("a")];
5094
5095 column_writer.write_batch(&values, None, None).unwrap();
5096
5097 let result = column_writer.close().unwrap();
5098 assert_eq!(
5099 result.metadata.uncompressed_size(),
5100 result.metadata.compressed_size()
5101 );
5102 }
5103
5104 struct ColumnRoundTripUniform<'a, T: DataType> {
5105 props: WriterProperties,
5106 values: &'a [T::T],
5107 def_levels: LevelDataRef<'a>,
5108 rep_levels: LevelDataRef<'a>,
5109 max_def_level: i16,
5110 max_rep_level: i16,
5111 expected_values: &'a [T::T],
5112 expected_def_levels: Option<&'a [i16]>,
5113 expected_rep_levels: Option<&'a [i16]>,
5114 }
5115
5116 impl<'a, T: DataType> ColumnRoundTripUniform<'a, T>
5117 where
5118 T::T: PartialEq + std::fmt::Debug,
5119 {
5120 fn new() -> Self {
5121 Self {
5122 props: Default::default(),
5123 values: &[],
5124 def_levels: LevelDataRef::Absent,
5125 rep_levels: LevelDataRef::Absent,
5126 max_def_level: 0,
5127 max_rep_level: 0,
5128 expected_values: &[],
5129 expected_def_levels: None,
5130 expected_rep_levels: None,
5131 }
5132 }
5133
5134 fn with_props(mut self, props: WriterProperties) -> Self {
5135 self.props = props;
5136 self
5137 }
5138
5139 fn with_values(mut self, values: &'a [T::T]) -> Self {
5140 self.values = values;
5141 self
5142 }
5143
5144 fn with_def_levels(mut self, def_levels: LevelDataRef<'a>) -> Self {
5145 self.def_levels = def_levels;
5146 self
5147 }
5148
5149 fn with_rep_levels(mut self, rep_levels: LevelDataRef<'a>) -> Self {
5150 self.rep_levels = rep_levels;
5151 self
5152 }
5153
5154 fn with_max_def_level(mut self, max_def_level: i16) -> Self {
5155 self.max_def_level = max_def_level;
5156 self
5157 }
5158
5159 fn with_max_rep_level(mut self, max_rep_level: i16) -> Self {
5160 self.max_rep_level = max_rep_level;
5161 self
5162 }
5163
5164 fn with_expected_values(mut self, expected_values: &'a [T::T]) -> Self {
5165 self.expected_values = expected_values;
5166 self
5167 }
5168
5169 fn with_expected_def_levels(mut self, expected_def_levels: &'a [i16]) -> Self {
5170 self.expected_def_levels = Some(expected_def_levels);
5171 self
5172 }
5173
5174 fn with_expected_rep_levels(mut self, expected_rep_levels: &'a [i16]) -> Self {
5175 self.expected_rep_levels = Some(expected_rep_levels);
5176 self
5177 }
5178
5179 fn run(self) {
5182 let mut file = tempfile::tempfile().unwrap();
5183 let mut write = TrackedWrite::new(&mut file);
5184 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
5185 let mut writer = get_test_column_writer::<T>(
5186 page_writer,
5187 self.max_def_level,
5188 self.max_rep_level,
5189 Arc::new(self.props),
5190 );
5191
5192 writer
5193 .write_batch_internal(
5194 self.values,
5195 None,
5196 self.def_levels,
5197 self.rep_levels,
5198 None,
5199 None,
5200 None,
5201 )
5202 .unwrap();
5203 let result = writer.close().unwrap();
5204 drop(write);
5205
5206 let props = ReaderProperties::builder()
5207 .set_backward_compatible_lz4(false)
5208 .build();
5209 let page_reader = Box::new(
5210 SerializedPageReader::new_with_properties(
5211 Arc::new(file),
5212 &result.metadata,
5213 result.rows_written as usize,
5214 None,
5215 Arc::new(props),
5216 )
5217 .unwrap(),
5218 );
5219 let mut reader =
5220 get_test_column_reader::<T>(page_reader, self.max_def_level, self.max_rep_level);
5221
5222 let batch_size = self
5223 .expected_def_levels
5224 .map_or(self.expected_values.len(), |l| l.len());
5225 let mut actual_values = Vec::with_capacity(batch_size);
5226 let mut actual_def = self
5227 .expected_def_levels
5228 .map(|_| Vec::with_capacity(batch_size));
5229 let mut actual_rep = self
5230 .expected_rep_levels
5231 .map(|_| Vec::with_capacity(batch_size));
5232
5233 let (_, values_read, levels_read) = reader
5234 .read_records(
5235 batch_size,
5236 actual_def.as_mut(),
5237 actual_rep.as_mut(),
5238 &mut actual_values,
5239 )
5240 .unwrap();
5241
5242 assert_eq!(&actual_values[..values_read], self.expected_values);
5243 if let Some(ref v) = actual_def {
5244 assert_eq!(&v[..levels_read], self.expected_def_levels.unwrap());
5245 }
5246 if let Some(ref v) = actual_rep {
5247 assert_eq!(&v[..levels_read], self.expected_rep_levels.unwrap());
5248 }
5249 }
5250 }
5251
5252 #[test]
5253 fn test_level_data_ref_value_count() {
5254 let max_def = 2;
5259 assert_eq!(LevelDataRef::Absent.value_count(64, max_def), 64);
5262 assert_eq!(
5264 LevelDataRef::Uniform {
5265 value: max_def,
5266 count: 40
5267 }
5268 .value_count(40, max_def),
5269 40
5270 );
5271 assert_eq!(
5272 LevelDataRef::Uniform {
5273 value: max_def - 1,
5274 count: 40
5275 }
5276 .value_count(40, max_def),
5277 0
5278 );
5279 let levels = [2i16, 0, 2, 1, 2, 2, 0];
5282 assert_eq!(
5283 LevelDataRef::Materialized(&levels).value_count(levels.len(), max_def),
5284 4
5285 );
5286 }
5287
5288 #[test]
5289 fn test_uniform_def_levels_all_null() {
5290 let max_def_level = 1;
5292 let count = 100;
5293 let expected_def_levels = vec![0i16; count];
5294 ColumnRoundTripUniform::<Int32Type>::new()
5295 .with_def_levels(LevelDataRef::Uniform { value: 0, count })
5296 .with_max_def_level(max_def_level)
5297 .with_expected_def_levels(&expected_def_levels)
5298 .run();
5299 }
5300
5301 #[test]
5302 fn test_uniform_def_levels_all_valid() {
5303 let max_def_level = 1;
5305 let values: Vec<i32> = (0..50).collect();
5306 let expected_def_levels = vec![max_def_level; values.len()];
5307 ColumnRoundTripUniform::<Int32Type>::new()
5308 .with_values(&values)
5309 .with_def_levels(LevelDataRef::Uniform {
5310 value: max_def_level,
5311 count: values.len(),
5312 })
5313 .with_max_def_level(max_def_level)
5314 .with_expected_values(&values)
5315 .with_expected_def_levels(&expected_def_levels)
5316 .run();
5317 }
5318
5319 #[test]
5320 fn test_uniform_def_and_rep_levels() {
5321 let max_def_level = 2;
5324 let max_rep_level = 1;
5325 let count = 200;
5326 let expected_def_levels = vec![0i16; count];
5327 let expected_rep_levels = vec![0i16; count];
5328 ColumnRoundTripUniform::<Int32Type>::new()
5329 .with_def_levels(LevelDataRef::Uniform { value: 0, count })
5330 .with_rep_levels(LevelDataRef::Uniform { value: 0, count })
5331 .with_max_def_level(max_def_level)
5332 .with_max_rep_level(max_rep_level)
5333 .with_expected_def_levels(&expected_def_levels)
5334 .with_expected_rep_levels(&expected_rep_levels)
5335 .run();
5336 }
5337
5338 #[test]
5339 fn test_uniform_levels_v1_and_v2() {
5340 for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
5342 let props = WriterProperties::builder()
5343 .set_writer_version(version)
5344 .build();
5345 let max_def = 1;
5346 let count = 100;
5347 let expected_def_levels = vec![0i16; count];
5348 ColumnRoundTripUniform::<Int32Type>::new()
5349 .with_props(props)
5350 .with_def_levels(LevelDataRef::Uniform { value: 0, count })
5351 .with_max_def_level(max_def)
5352 .with_expected_def_levels(&expected_def_levels)
5353 .run();
5354 }
5355 }
5356}