1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::file::page_index::column_index::ColumnIndexMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use std::collections::{BTreeSet, VecDeque};
27use std::str;
28
29use crate::basic::{
30 BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType, Type,
31};
32use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
33use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
34use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
35use crate::data_type::private::ParquetValueType;
36use crate::data_type::*;
37use crate::encodings::levels::LevelEncoder;
38#[cfg(feature = "encryption")]
39use crate::encryption::encrypt::get_column_crypto_metadata;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{
42 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
43 OffsetIndexBuilder, PageEncodingStats,
44};
45use crate::file::properties::{
46 EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
47};
48use crate::file::statistics::{Statistics, ValueStatistics};
49use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
50
51pub(crate) mod encoder;
52
53macro_rules! downcast_writer {
54 ($e:expr, $i:ident, $b:expr) => {
55 match $e {
56 Self::BoolColumnWriter($i) => $b,
57 Self::Int32ColumnWriter($i) => $b,
58 Self::Int64ColumnWriter($i) => $b,
59 Self::Int96ColumnWriter($i) => $b,
60 Self::FloatColumnWriter($i) => $b,
61 Self::DoubleColumnWriter($i) => $b,
62 Self::ByteArrayColumnWriter($i) => $b,
63 Self::FixedLenByteArrayColumnWriter($i) => $b,
64 }
65 };
66}
67
68pub enum ColumnWriter<'a> {
72 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
74 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
76 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
78 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
80 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
82 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
84 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
86 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
88}
89
90impl ColumnWriter<'_> {
91 #[cfg(feature = "arrow")]
93 pub(crate) fn memory_size(&self) -> usize {
94 downcast_writer!(self, typed, typed.memory_size())
95 }
96
97 #[cfg(feature = "arrow")]
99 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
100 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
101 }
102
103 #[cfg(feature = "arrow")]
108 pub(crate) fn add_data_page(&mut self) -> Result<()> {
109 downcast_writer!(self, typed, typed.add_data_page())
110 }
111
112 pub fn close(self) -> Result<ColumnCloseResult> {
114 downcast_writer!(self, typed, typed.close())
115 }
116}
117
118pub fn get_column_writer<'a>(
120 descr: ColumnDescPtr,
121 props: WriterPropertiesPtr,
122 page_writer: Box<dyn PageWriter + 'a>,
123) -> ColumnWriter<'a> {
124 match descr.physical_type() {
125 Type::BOOLEAN => {
126 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127 }
128 Type::INT32 => {
129 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130 }
131 Type::INT64 => {
132 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133 }
134 Type::INT96 => {
135 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136 }
137 Type::FLOAT => {
138 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
139 }
140 Type::DOUBLE => {
141 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
142 }
143 Type::BYTE_ARRAY => {
144 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
145 }
146 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
147 ColumnWriterImpl::new(descr, props, page_writer),
148 ),
149 }
150}
151
152pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
157 T::get_column_writer(col_writer).unwrap_or_else(|| {
158 panic!(
159 "Failed to convert column writer into a typed column writer for `{}` type",
160 T::get_physical_type()
161 )
162 })
163}
164
165pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
167 col_writer: &'b ColumnWriter<'a>,
168) -> &'b ColumnWriterImpl<'a, T> {
169 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
170 panic!(
171 "Failed to convert column writer into a typed column writer for `{}` type",
172 T::get_physical_type()
173 )
174 })
175}
176
177pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
179 col_writer: &'a mut ColumnWriter<'b>,
180) -> &'a mut ColumnWriterImpl<'b, T> {
181 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
182 panic!(
183 "Failed to convert column writer into a typed column writer for `{}` type",
184 T::get_physical_type()
185 )
186 })
187}
188
189#[derive(Debug, Clone)]
193pub struct ColumnCloseResult {
194 pub bytes_written: u64,
196 pub rows_written: u64,
198 pub metadata: ColumnChunkMetaData,
200 pub bloom_filter: Option<Sbbf>,
202 pub column_index: Option<ColumnIndexMetaData>,
204 pub offset_index: Option<OffsetIndexMetaData>,
206}
207
208#[derive(Default)]
210struct PageMetrics {
211 num_buffered_values: u32,
212 num_buffered_rows: u32,
213 num_page_nulls: u64,
214 repetition_level_histogram: Option<LevelHistogram>,
215 definition_level_histogram: Option<LevelHistogram>,
216}
217
218impl PageMetrics {
219 fn new() -> Self {
220 Default::default()
221 }
222
223 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
225 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
226 self
227 }
228
229 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
231 self.definition_level_histogram = LevelHistogram::try_new(max_level);
232 self
233 }
234
235 fn new_page(&mut self) {
238 self.num_buffered_values = 0;
239 self.num_buffered_rows = 0;
240 self.num_page_nulls = 0;
241 self.repetition_level_histogram
242 .as_mut()
243 .map(LevelHistogram::reset);
244 self.definition_level_histogram
245 .as_mut()
246 .map(LevelHistogram::reset);
247 }
248
249 fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
251 if let Some(ref mut rep_hist) = self.repetition_level_histogram {
252 rep_hist.update_from_levels(levels);
253 }
254 }
255
256 fn update_definition_level_histogram(&mut self, levels: &[i16]) {
258 if let Some(ref mut def_hist) = self.definition_level_histogram {
259 def_hist.update_from_levels(levels);
260 }
261 }
262}
263
264#[derive(Default)]
266struct ColumnMetrics<T: Default> {
267 total_bytes_written: u64,
268 total_rows_written: u64,
269 total_uncompressed_size: u64,
270 total_compressed_size: u64,
271 total_num_values: u64,
272 dictionary_page_offset: Option<u64>,
273 data_page_offset: Option<u64>,
274 min_column_value: Option<T>,
275 max_column_value: Option<T>,
276 num_column_nulls: u64,
277 column_distinct_count: Option<u64>,
278 variable_length_bytes: Option<i64>,
279 repetition_level_histogram: Option<LevelHistogram>,
280 definition_level_histogram: Option<LevelHistogram>,
281}
282
283impl<T: Default> ColumnMetrics<T> {
284 fn new() -> Self {
285 Default::default()
286 }
287
288 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
290 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
291 self
292 }
293
294 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
296 self.definition_level_histogram = LevelHistogram::try_new(max_level);
297 self
298 }
299
300 fn update_histogram(
302 chunk_histogram: &mut Option<LevelHistogram>,
303 page_histogram: &Option<LevelHistogram>,
304 ) {
305 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
306 chunk_hist.add(page_hist);
307 }
308 }
309
310 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
313 ColumnMetrics::<T>::update_histogram(
314 &mut self.definition_level_histogram,
315 &page_metrics.definition_level_histogram,
316 );
317 ColumnMetrics::<T>::update_histogram(
318 &mut self.repetition_level_histogram,
319 &page_metrics.repetition_level_histogram,
320 );
321 }
322
323 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
325 if let Some(var_bytes) = variable_length_bytes {
326 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
327 }
328 }
329}
330
331pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
333
334pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
336 descr: ColumnDescPtr,
338 props: WriterPropertiesPtr,
339 statistics_enabled: EnabledStatistics,
340
341 page_writer: Box<dyn PageWriter + 'a>,
342 codec: Compression,
343 compressor: Option<Box<dyn Codec>>,
344 encoder: E,
345
346 page_metrics: PageMetrics,
347 column_metrics: ColumnMetrics<E::T>,
349
350 encodings: BTreeSet<Encoding>,
353 encoding_stats: Vec<PageEncodingStats>,
354 def_levels_sink: Vec<i16>,
356 rep_levels_sink: Vec<i16>,
357 data_pages: VecDeque<CompressedPage>,
358 column_index_builder: ColumnIndexBuilder,
360 offset_index_builder: Option<OffsetIndexBuilder>,
361
362 data_page_boundary_ascending: bool,
365 data_page_boundary_descending: bool,
366 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
368}
369
370impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
371 pub fn new(
373 descr: ColumnDescPtr,
374 props: WriterPropertiesPtr,
375 page_writer: Box<dyn PageWriter + 'a>,
376 ) -> Self {
377 let codec = props.compression(descr.path());
378 let codec_options = CodecOptionsBuilder::default().build();
379 let compressor = create_codec(codec, &codec_options).unwrap();
380 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
381
382 let statistics_enabled = props.statistics_enabled(descr.path());
383
384 let mut encodings = BTreeSet::new();
385 encodings.insert(Encoding::RLE);
387
388 let mut page_metrics = PageMetrics::new();
389 let mut column_metrics = ColumnMetrics::<E::T>::new();
390
391 if statistics_enabled != EnabledStatistics::None {
393 page_metrics = page_metrics
394 .with_repetition_level_histogram(descr.max_rep_level())
395 .with_definition_level_histogram(descr.max_def_level());
396 column_metrics = column_metrics
397 .with_repetition_level_histogram(descr.max_rep_level())
398 .with_definition_level_histogram(descr.max_def_level())
399 }
400
401 let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type());
403 if statistics_enabled != EnabledStatistics::Page {
404 column_index_builder.to_invalid()
405 }
406
407 let offset_index_builder = match props.offset_index_disabled() {
409 false => Some(OffsetIndexBuilder::new()),
410 _ => None,
411 };
412
413 Self {
414 descr,
415 props,
416 statistics_enabled,
417 page_writer,
418 codec,
419 compressor,
420 encoder,
421 def_levels_sink: vec![],
422 rep_levels_sink: vec![],
423 data_pages: VecDeque::new(),
424 page_metrics,
425 column_metrics,
426 column_index_builder,
427 offset_index_builder,
428 encodings,
429 encoding_stats: vec![],
430 data_page_boundary_ascending: true,
431 data_page_boundary_descending: true,
432 last_non_null_data_page_min_max: None,
433 }
434 }
435
436 #[allow(clippy::too_many_arguments)]
437 pub(crate) fn write_batch_internal(
438 &mut self,
439 values: &E::Values,
440 value_indices: Option<&[usize]>,
441 def_levels: Option<&[i16]>,
442 rep_levels: Option<&[i16]>,
443 min: Option<&E::T>,
444 max: Option<&E::T>,
445 distinct_count: Option<u64>,
446 ) -> Result<usize> {
447 if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
449 if def.len() != rep.len() {
450 return Err(general_err!(
451 "Inconsistent length of definition and repetition levels: {} != {}",
452 def.len(),
453 rep.len()
454 ));
455 }
456 }
457
458 let num_levels = match def_levels {
469 Some(def_levels) => def_levels.len(),
470 None => values.len(),
471 };
472
473 if let Some(min) = min {
474 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
475 }
476 if let Some(max) = max {
477 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
478 }
479
480 if self.encoder.num_values() == 0 {
482 self.column_metrics.column_distinct_count = distinct_count;
483 } else {
484 self.column_metrics.column_distinct_count = None;
485 }
486
487 let mut values_offset = 0;
488 let mut levels_offset = 0;
489 let base_batch_size = self.props.write_batch_size();
490 while levels_offset < num_levels {
491 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
492
493 if let Some(r) = rep_levels {
495 while end_offset < r.len() && r[end_offset] != 0 {
496 end_offset += 1;
497 }
498 }
499
500 values_offset += self.write_mini_batch(
501 values,
502 values_offset,
503 value_indices,
504 end_offset - levels_offset,
505 def_levels.map(|lv| &lv[levels_offset..end_offset]),
506 rep_levels.map(|lv| &lv[levels_offset..end_offset]),
507 )?;
508 levels_offset = end_offset;
509 }
510
511 Ok(values_offset)
513 }
514
515 pub fn write_batch(
528 &mut self,
529 values: &E::Values,
530 def_levels: Option<&[i16]>,
531 rep_levels: Option<&[i16]>,
532 ) -> Result<usize> {
533 self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
534 }
535
536 pub fn write_batch_with_statistics(
544 &mut self,
545 values: &E::Values,
546 def_levels: Option<&[i16]>,
547 rep_levels: Option<&[i16]>,
548 min: Option<&E::T>,
549 max: Option<&E::T>,
550 distinct_count: Option<u64>,
551 ) -> Result<usize> {
552 self.write_batch_internal(
553 values,
554 None,
555 def_levels,
556 rep_levels,
557 min,
558 max,
559 distinct_count,
560 )
561 }
562
563 #[cfg(feature = "arrow")]
568 pub(crate) fn memory_size(&self) -> usize {
569 self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
570 }
571
572 pub fn get_total_bytes_written(&self) -> u64 {
578 self.column_metrics.total_bytes_written
579 }
580
581 #[cfg(feature = "arrow")]
587 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
588 self.data_pages
589 .iter()
590 .map(|page| page.data().len() as u64)
591 .sum::<u64>()
592 + self.column_metrics.total_bytes_written
593 + self.encoder.estimated_data_page_size() as u64
594 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
595 }
596
597 pub fn get_total_rows_written(&self) -> u64 {
600 self.column_metrics.total_rows_written
601 }
602
603 pub fn get_descriptor(&self) -> &ColumnDescPtr {
605 &self.descr
606 }
607
608 pub fn close(mut self) -> Result<ColumnCloseResult> {
611 if self.page_metrics.num_buffered_values > 0 {
612 self.add_data_page()?;
613 }
614 if self.encoder.has_dictionary() {
615 self.write_dictionary_page()?;
616 }
617 self.flush_data_pages()?;
618 let metadata = self.build_column_metadata()?;
619 self.page_writer.close()?;
620
621 let boundary_order = match (
622 self.data_page_boundary_ascending,
623 self.data_page_boundary_descending,
624 ) {
625 (true, _) => BoundaryOrder::ASCENDING,
628 (false, true) => BoundaryOrder::DESCENDING,
629 (false, false) => BoundaryOrder::UNORDERED,
630 };
631 self.column_index_builder.set_boundary_order(boundary_order);
632
633 let column_index = match self.column_index_builder.valid() {
634 true => Some(self.column_index_builder.build()?),
635 false => None,
636 };
637
638 let offset_index = self.offset_index_builder.map(|b| b.build());
639
640 Ok(ColumnCloseResult {
641 bytes_written: self.column_metrics.total_bytes_written,
642 rows_written: self.column_metrics.total_rows_written,
643 bloom_filter: self.encoder.flush_bloom_filter(),
644 metadata,
645 column_index,
646 offset_index,
647 })
648 }
649
650 fn write_mini_batch(
654 &mut self,
655 values: &E::Values,
656 values_offset: usize,
657 value_indices: Option<&[usize]>,
658 num_levels: usize,
659 def_levels: Option<&[i16]>,
660 rep_levels: Option<&[i16]>,
661 ) -> Result<usize> {
662 let values_to_write = if self.descr.max_def_level() > 0 {
664 let levels = def_levels.ok_or_else(|| {
665 general_err!(
666 "Definition levels are required, because max definition level = {}",
667 self.descr.max_def_level()
668 )
669 })?;
670
671 let values_to_write = levels
672 .iter()
673 .map(|level| (*level == self.descr.max_def_level()) as usize)
674 .sum();
675 self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
676
677 self.page_metrics.update_definition_level_histogram(levels);
679
680 self.def_levels_sink.extend_from_slice(levels);
681 values_to_write
682 } else {
683 num_levels
684 };
685
686 if self.descr.max_rep_level() > 0 {
688 let levels = rep_levels.ok_or_else(|| {
690 general_err!(
691 "Repetition levels are required, because max repetition level = {}",
692 self.descr.max_rep_level()
693 )
694 })?;
695
696 if !levels.is_empty() && levels[0] != 0 {
697 return Err(general_err!(
698 "Write must start at a record boundary, got non-zero repetition level of {}",
699 levels[0]
700 ));
701 }
702
703 for &level in levels {
705 self.page_metrics.num_buffered_rows += (level == 0) as u32
706 }
707
708 self.page_metrics.update_repetition_level_histogram(levels);
710
711 self.rep_levels_sink.extend_from_slice(levels);
712 } else {
713 self.page_metrics.num_buffered_rows += num_levels as u32;
716 }
717
718 match value_indices {
719 Some(indices) => {
720 let indices = &indices[values_offset..values_offset + values_to_write];
721 self.encoder.write_gather(values, indices)?;
722 }
723 None => self.encoder.write(values, values_offset, values_to_write)?,
724 }
725
726 self.page_metrics.num_buffered_values += num_levels as u32;
727
728 if self.should_add_data_page() {
729 self.add_data_page()?;
730 }
731
732 if self.should_dict_fallback() {
733 self.dict_fallback()?;
734 }
735
736 Ok(values_to_write)
737 }
738
739 #[inline]
744 fn should_dict_fallback(&self) -> bool {
745 match self.encoder.estimated_dict_page_size() {
746 Some(size) => {
747 size >= self
748 .props
749 .column_dictionary_page_size_limit(self.descr.path())
750 }
751 None => false,
752 }
753 }
754
755 #[inline]
757 fn should_add_data_page(&self) -> bool {
758 if self.page_metrics.num_buffered_values == 0 {
763 return false;
764 }
765
766 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
767 || self.encoder.estimated_data_page_size()
768 >= self.props.column_data_page_size_limit(self.descr.path())
769 }
770
771 fn dict_fallback(&mut self) -> Result<()> {
774 if self.page_metrics.num_buffered_values > 0 {
776 self.add_data_page()?;
777 }
778 self.write_dictionary_page()?;
779 self.flush_data_pages()?;
780 Ok(())
781 }
782
783 fn update_column_offset_index(
785 &mut self,
786 page_statistics: Option<&ValueStatistics<E::T>>,
787 page_variable_length_bytes: Option<i64>,
788 ) {
789 let null_page =
791 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
792 if null_page && self.column_index_builder.valid() {
795 self.column_index_builder.append(
796 null_page,
797 vec![],
798 vec![],
799 self.page_metrics.num_page_nulls as i64,
800 );
801 } else if self.column_index_builder.valid() {
802 match &page_statistics {
805 None => {
806 self.column_index_builder.to_invalid();
807 }
808 Some(stat) => {
809 let new_min = stat.min_opt().unwrap();
811 let new_max = stat.max_opt().unwrap();
812 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
813 if self.data_page_boundary_ascending {
814 let not_ascending = compare_greater(&self.descr, last_min, new_min)
816 || compare_greater(&self.descr, last_max, new_max);
817 if not_ascending {
818 self.data_page_boundary_ascending = false;
819 }
820 }
821
822 if self.data_page_boundary_descending {
823 let not_descending = compare_greater(&self.descr, new_min, last_min)
825 || compare_greater(&self.descr, new_max, last_max);
826 if not_descending {
827 self.data_page_boundary_descending = false;
828 }
829 }
830 }
831 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
832
833 if self.can_truncate_value() {
834 self.column_index_builder.append(
835 null_page,
836 self.truncate_min_value(
837 self.props.column_index_truncate_length(),
838 stat.min_bytes_opt().unwrap(),
839 )
840 .0,
841 self.truncate_max_value(
842 self.props.column_index_truncate_length(),
843 stat.max_bytes_opt().unwrap(),
844 )
845 .0,
846 self.page_metrics.num_page_nulls as i64,
847 );
848 } else {
849 self.column_index_builder.append(
850 null_page,
851 stat.min_bytes_opt().unwrap().to_vec(),
852 stat.max_bytes_opt().unwrap().to_vec(),
853 self.page_metrics.num_page_nulls as i64,
854 );
855 }
856 }
857 }
858 }
859
860 self.column_index_builder.append_histograms(
862 &self.page_metrics.repetition_level_histogram,
863 &self.page_metrics.definition_level_histogram,
864 );
865
866 if let Some(builder) = self.offset_index_builder.as_mut() {
868 builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
869 builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
870 }
871 }
872
873 fn can_truncate_value(&self) -> bool {
875 match self.descr.physical_type() {
876 Type::FIXED_LEN_BYTE_ARRAY
880 if !matches!(
881 self.descr.logical_type_ref(),
882 Some(&LogicalType::Decimal { .. }) | Some(&LogicalType::Float16)
883 ) =>
884 {
885 true
886 }
887 Type::BYTE_ARRAY => true,
888 _ => false,
890 }
891 }
892
893 fn is_utf8(&self) -> bool {
895 self.get_descriptor().logical_type_ref() == Some(&LogicalType::String)
896 || self.get_descriptor().converted_type() == ConvertedType::UTF8
897 }
898
899 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
910 truncation_length
911 .filter(|l| data.len() > *l)
912 .and_then(|l|
913 if self.is_utf8() {
915 match str::from_utf8(data) {
916 Ok(str_data) => truncate_utf8(str_data, l),
917 Err(_) => Some(data[..l].to_vec()),
918 }
919 } else {
920 Some(data[..l].to_vec())
921 }
922 )
923 .map(|truncated| (truncated, true))
924 .unwrap_or_else(|| (data.to_vec(), false))
925 }
926
927 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
941 truncation_length
942 .filter(|l| data.len() > *l)
943 .and_then(|l|
944 if self.is_utf8() {
946 match str::from_utf8(data) {
947 Ok(str_data) => truncate_and_increment_utf8(str_data, l),
948 Err(_) => increment(data[..l].to_vec()),
949 }
950 } else {
951 increment(data[..l].to_vec())
952 }
953 )
954 .map(|truncated| (truncated, true))
955 .unwrap_or_else(|| (data.to_vec(), false))
956 }
957
958 fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
961 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
962 match statistics {
963 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
964 let (min, did_truncate_min) = self.truncate_min_value(
965 self.props.statistics_truncate_length(),
966 stats.min_bytes_opt().unwrap(),
967 );
968 let (max, did_truncate_max) = self.truncate_max_value(
969 self.props.statistics_truncate_length(),
970 stats.max_bytes_opt().unwrap(),
971 );
972 Statistics::ByteArray(
973 ValueStatistics::new(
974 Some(min.into()),
975 Some(max.into()),
976 stats.distinct_count(),
977 stats.null_count_opt(),
978 backwards_compatible_min_max,
979 )
980 .with_max_is_exact(!did_truncate_max)
981 .with_min_is_exact(!did_truncate_min),
982 )
983 }
984 Statistics::FixedLenByteArray(stats)
985 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
986 {
987 let (min, did_truncate_min) = self.truncate_min_value(
988 self.props.statistics_truncate_length(),
989 stats.min_bytes_opt().unwrap(),
990 );
991 let (max, did_truncate_max) = self.truncate_max_value(
992 self.props.statistics_truncate_length(),
993 stats.max_bytes_opt().unwrap(),
994 );
995 Statistics::FixedLenByteArray(
996 ValueStatistics::new(
997 Some(min.into()),
998 Some(max.into()),
999 stats.distinct_count(),
1000 stats.null_count_opt(),
1001 backwards_compatible_min_max,
1002 )
1003 .with_max_is_exact(!did_truncate_max)
1004 .with_min_is_exact(!did_truncate_min),
1005 )
1006 }
1007 stats => stats,
1008 }
1009 }
1010
1011 pub(crate) fn add_data_page(&mut self) -> Result<()> {
1014 let values_data = self.encoder.flush_data_page()?;
1016
1017 let max_def_level = self.descr.max_def_level();
1018 let max_rep_level = self.descr.max_rep_level();
1019
1020 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1021
1022 let page_statistics = match (values_data.min_value, values_data.max_value) {
1023 (Some(min), Some(max)) => {
1024 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1026 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1027
1028 (self.statistics_enabled == EnabledStatistics::Page).then_some(
1029 ValueStatistics::new(
1030 Some(min),
1031 Some(max),
1032 None,
1033 Some(self.page_metrics.num_page_nulls),
1034 false,
1035 ),
1036 )
1037 }
1038 _ => None,
1039 };
1040
1041 self.update_column_offset_index(
1043 page_statistics.as_ref(),
1044 values_data.variable_length_bytes,
1045 );
1046
1047 self.column_metrics
1049 .update_from_page_metrics(&self.page_metrics);
1050 self.column_metrics
1051 .update_variable_length_bytes(values_data.variable_length_bytes);
1052
1053 let page_statistics = page_statistics
1055 .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1056 .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1057
1058 let compressed_page = match self.props.writer_version() {
1059 WriterVersion::PARQUET_1_0 => {
1060 let mut buffer = vec![];
1061
1062 if max_rep_level > 0 {
1063 buffer.extend_from_slice(
1064 &self.encode_levels_v1(
1065 Encoding::RLE,
1066 &self.rep_levels_sink[..],
1067 max_rep_level,
1068 )[..],
1069 );
1070 }
1071
1072 if max_def_level > 0 {
1073 buffer.extend_from_slice(
1074 &self.encode_levels_v1(
1075 Encoding::RLE,
1076 &self.def_levels_sink[..],
1077 max_def_level,
1078 )[..],
1079 );
1080 }
1081
1082 buffer.extend_from_slice(&values_data.buf);
1083 let uncompressed_size = buffer.len();
1084
1085 if let Some(ref mut cmpr) = self.compressor {
1086 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1087 cmpr.compress(&buffer[..], &mut compressed_buf)?;
1088 compressed_buf.shrink_to_fit();
1089 buffer = compressed_buf;
1090 }
1091
1092 let data_page = Page::DataPage {
1093 buf: buffer.into(),
1094 num_values: self.page_metrics.num_buffered_values,
1095 encoding: values_data.encoding,
1096 def_level_encoding: Encoding::RLE,
1097 rep_level_encoding: Encoding::RLE,
1098 statistics: page_statistics,
1099 };
1100
1101 CompressedPage::new(data_page, uncompressed_size)
1102 }
1103 WriterVersion::PARQUET_2_0 => {
1104 let mut rep_levels_byte_len = 0;
1105 let mut def_levels_byte_len = 0;
1106 let mut buffer = vec![];
1107
1108 if max_rep_level > 0 {
1109 let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1110 rep_levels_byte_len = levels.len();
1111 buffer.extend_from_slice(&levels[..]);
1112 }
1113
1114 if max_def_level > 0 {
1115 let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1116 def_levels_byte_len = levels.len();
1117 buffer.extend_from_slice(&levels[..]);
1118 }
1119
1120 let uncompressed_size =
1121 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1122
1123 let is_compressed = match self.compressor {
1125 Some(ref mut cmpr) => {
1126 let buffer_len = buffer.len();
1127 cmpr.compress(&values_data.buf, &mut buffer)?;
1128 if uncompressed_size <= buffer.len() - buffer_len {
1129 buffer.truncate(buffer_len);
1130 buffer.extend_from_slice(&values_data.buf);
1131 false
1132 } else {
1133 true
1134 }
1135 }
1136 None => {
1137 buffer.extend_from_slice(&values_data.buf);
1138 false
1139 }
1140 };
1141
1142 let data_page = Page::DataPageV2 {
1143 buf: buffer.into(),
1144 num_values: self.page_metrics.num_buffered_values,
1145 encoding: values_data.encoding,
1146 num_nulls: self.page_metrics.num_page_nulls as u32,
1147 num_rows: self.page_metrics.num_buffered_rows,
1148 def_levels_byte_len: def_levels_byte_len as u32,
1149 rep_levels_byte_len: rep_levels_byte_len as u32,
1150 is_compressed,
1151 statistics: page_statistics,
1152 };
1153
1154 CompressedPage::new(data_page, uncompressed_size)
1155 }
1156 };
1157
1158 if self.encoder.has_dictionary() {
1160 self.data_pages.push_back(compressed_page);
1161 } else {
1162 self.write_data_page(compressed_page)?;
1163 }
1164
1165 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1167
1168 self.rep_levels_sink.clear();
1170 self.def_levels_sink.clear();
1171 self.page_metrics.new_page();
1172
1173 Ok(())
1174 }
1175
1176 #[inline]
1179 fn flush_data_pages(&mut self) -> Result<()> {
1180 if self.page_metrics.num_buffered_values > 0 {
1182 self.add_data_page()?;
1183 }
1184
1185 while let Some(page) = self.data_pages.pop_front() {
1186 self.write_data_page(page)?;
1187 }
1188
1189 Ok(())
1190 }
1191
1192 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1194 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1195 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1196 let num_values = self.column_metrics.total_num_values as i64;
1197 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1198 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1200
1201 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1202 .set_compression(self.codec)
1203 .set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
1204 .set_page_encoding_stats(self.encoding_stats.clone())
1205 .set_total_compressed_size(total_compressed_size)
1206 .set_total_uncompressed_size(total_uncompressed_size)
1207 .set_num_values(num_values)
1208 .set_data_page_offset(data_page_offset)
1209 .set_dictionary_page_offset(dict_page_offset);
1210
1211 if self.statistics_enabled != EnabledStatistics::None {
1212 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1213
1214 let statistics = ValueStatistics::<E::T>::new(
1215 self.column_metrics.min_column_value.clone(),
1216 self.column_metrics.max_column_value.clone(),
1217 self.column_metrics.column_distinct_count,
1218 Some(self.column_metrics.num_column_nulls),
1219 false,
1220 )
1221 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1222 .into();
1223
1224 let statistics = self.truncate_statistics(statistics);
1225
1226 builder = builder
1227 .set_statistics(statistics)
1228 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1229 .set_repetition_level_histogram(
1230 self.column_metrics.repetition_level_histogram.take(),
1231 )
1232 .set_definition_level_histogram(
1233 self.column_metrics.definition_level_histogram.take(),
1234 );
1235
1236 if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
1237 builder = builder.set_geo_statistics(geo_stats);
1238 }
1239 }
1240
1241 builder = self.set_column_chunk_encryption_properties(builder);
1242
1243 let metadata = builder.build()?;
1244 Ok(metadata)
1245 }
1246
1247 #[inline]
1249 fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1250 let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1251 encoder.put(levels);
1252 encoder.consume()
1253 }
1254
1255 #[inline]
1258 fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1259 let mut encoder = LevelEncoder::v2(max_level, levels.len());
1260 encoder.put(levels);
1261 encoder.consume()
1262 }
1263
1264 #[inline]
1266 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1267 self.encodings.insert(page.encoding());
1268 match self.encoding_stats.last_mut() {
1269 Some(encoding_stats)
1270 if encoding_stats.page_type == page.page_type()
1271 && encoding_stats.encoding == page.encoding() =>
1272 {
1273 encoding_stats.count += 1;
1274 }
1275 _ => {
1276 self.encoding_stats.push(PageEncodingStats {
1279 page_type: page.page_type(),
1280 encoding: page.encoding(),
1281 count: 1,
1282 });
1283 }
1284 }
1285 let page_spec = self.page_writer.write_page(page)?;
1286 if let Some(builder) = self.offset_index_builder.as_mut() {
1289 builder
1290 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1291 }
1292 self.update_metrics_for_page(page_spec);
1293 Ok(())
1294 }
1295
1296 #[inline]
1298 fn write_dictionary_page(&mut self) -> Result<()> {
1299 let compressed_page = {
1300 let mut page = self
1301 .encoder
1302 .flush_dict_page()?
1303 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1304
1305 let uncompressed_size = page.buf.len();
1306
1307 if let Some(ref mut cmpr) = self.compressor {
1308 let mut output_buf = Vec::with_capacity(uncompressed_size);
1309 cmpr.compress(&page.buf, &mut output_buf)?;
1310 page.buf = Bytes::from(output_buf);
1311 }
1312
1313 let dict_page = Page::DictionaryPage {
1314 buf: page.buf,
1315 num_values: page.num_values as u32,
1316 encoding: self.props.dictionary_page_encoding(),
1317 is_sorted: page.is_sorted,
1318 };
1319 CompressedPage::new(dict_page, uncompressed_size)
1320 };
1321
1322 self.encodings.insert(compressed_page.encoding());
1323 self.encoding_stats.push(PageEncodingStats {
1324 page_type: PageType::DICTIONARY_PAGE,
1325 encoding: compressed_page.encoding(),
1326 count: 1,
1327 });
1328 let page_spec = self.page_writer.write_page(compressed_page)?;
1329 self.update_metrics_for_page(page_spec);
1330 Ok(())
1332 }
1333
1334 #[inline]
1336 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1337 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1338 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1339 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1340
1341 match page_spec.page_type {
1342 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1343 self.column_metrics.total_num_values += page_spec.num_values as u64;
1344 if self.column_metrics.data_page_offset.is_none() {
1345 self.column_metrics.data_page_offset = Some(page_spec.offset);
1346 }
1347 }
1348 PageType::DICTIONARY_PAGE => {
1349 assert!(
1350 self.column_metrics.dictionary_page_offset.is_none(),
1351 "Dictionary offset is already set"
1352 );
1353 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1354 }
1355 _ => {}
1356 }
1357 }
1358
1359 #[inline]
1360 #[cfg(feature = "encryption")]
1361 fn set_column_chunk_encryption_properties(
1362 &self,
1363 builder: ColumnChunkMetaDataBuilder,
1364 ) -> ColumnChunkMetaDataBuilder {
1365 if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1366 builder.set_column_crypto_metadata(get_column_crypto_metadata(
1367 encryption_properties,
1368 &self.descr,
1369 ))
1370 } else {
1371 builder
1372 }
1373 }
1374
1375 #[inline]
1376 #[cfg(not(feature = "encryption"))]
1377 fn set_column_chunk_encryption_properties(
1378 &self,
1379 builder: ColumnChunkMetaDataBuilder,
1380 ) -> ColumnChunkMetaDataBuilder {
1381 builder
1382 }
1383}
1384
1385fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1386 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1387}
1388
1389fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1390 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1391}
1392
1393#[inline]
1394#[allow(clippy::eq_op)]
1395fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1396 match T::PHYSICAL_TYPE {
1397 Type::FLOAT | Type::DOUBLE => val != val,
1398 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type_ref() == Some(&LogicalType::Float16) => {
1399 let val = val.as_bytes();
1400 let val = f16::from_le_bytes([val[0], val[1]]);
1401 val.is_nan()
1402 }
1403 _ => false,
1404 }
1405}
1406
1407fn update_stat<T: ParquetValueType, F>(
1412 descr: &ColumnDescriptor,
1413 val: &T,
1414 cur: &mut Option<T>,
1415 should_update: F,
1416) where
1417 F: Fn(&T) -> bool,
1418{
1419 if is_nan(descr, val) {
1420 return;
1421 }
1422
1423 if cur.as_ref().is_none_or(should_update) {
1424 *cur = Some(val.clone());
1425 }
1426}
1427
1428fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1430 match T::PHYSICAL_TYPE {
1431 Type::INT32 | Type::INT64 => {
1432 if let Some(LogicalType::Integer {
1433 is_signed: false, ..
1434 }) = descr.logical_type_ref()
1435 {
1436 return compare_greater_unsigned_int(a, b);
1438 }
1439
1440 match descr.converted_type() {
1441 ConvertedType::UINT_8
1442 | ConvertedType::UINT_16
1443 | ConvertedType::UINT_32
1444 | ConvertedType::UINT_64 => {
1445 return compare_greater_unsigned_int(a, b);
1446 }
1447 _ => {}
1448 };
1449 }
1450 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1451 if let Some(LogicalType::Decimal { .. }) = descr.logical_type_ref() {
1452 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1453 }
1454 if let ConvertedType::DECIMAL = descr.converted_type() {
1455 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1456 }
1457 if let Some(LogicalType::Float16) = descr.logical_type_ref() {
1458 return compare_greater_f16(a.as_bytes(), b.as_bytes());
1459 }
1460 }
1461
1462 _ => {}
1463 }
1464
1465 a > b
1467}
1468
1469fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1477 match (kind, props.writer_version()) {
1478 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1479 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1480 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1481 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1482 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1483 _ => Encoding::PLAIN,
1484 }
1485}
1486
1487fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1489 match (kind, props.writer_version()) {
1490 (Type::BOOLEAN, _) => false,
1492 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1494 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1495 _ => true,
1496 }
1497}
1498
1499#[inline]
1500fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1501 a.as_u64().unwrap() > b.as_u64().unwrap()
1502}
1503
1504#[inline]
1505fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1506 let a = f16::from_le_bytes(a.try_into().unwrap());
1507 let b = f16::from_le_bytes(b.try_into().unwrap());
1508 a > b
1509}
1510
1511fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1513 let a_length = a.len();
1514 let b_length = b.len();
1515
1516 if a_length == 0 || b_length == 0 {
1517 return a_length > 0;
1518 }
1519
1520 let first_a: u8 = a[0];
1521 let first_b: u8 = b[0];
1522
1523 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1528 return (first_a as i8) > (first_b as i8);
1529 }
1530
1531 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1537
1538 if a_length != b_length {
1539 let not_equal = if a_length > b_length {
1540 let lead_length = a_length - b_length;
1541 a[0..lead_length].iter().any(|&x| x != extension)
1542 } else {
1543 let lead_length = b_length - a_length;
1544 b[0..lead_length].iter().any(|&x| x != extension)
1545 };
1546
1547 if not_equal {
1548 let negative_values: bool = (first_a as i8) < 0;
1549 let a_longer: bool = a_length > b_length;
1550 return if negative_values { !a_longer } else { a_longer };
1551 }
1552 }
1553
1554 (a[1..]) > (b[1..])
1555}
1556
1557fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1563 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1564 Some(data.as_bytes()[..split].to_vec())
1565}
1566
1567fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1573 let lower_bound = length.saturating_sub(3);
1575 let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1576 increment_utf8(data.get(..split)?)
1577}
1578
1579fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1586 for (idx, original_char) in data.char_indices().rev() {
1587 let original_len = original_char.len_utf8();
1588 if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1589 if next_char.len_utf8() == original_len {
1591 let mut result = data.as_bytes()[..idx + original_len].to_vec();
1592 next_char.encode_utf8(&mut result[idx..]);
1593 return Some(result);
1594 }
1595 }
1596 }
1597
1598 None
1599}
1600
1601fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1605 for byte in data.iter_mut().rev() {
1606 let (incremented, overflow) = byte.overflowing_add(1);
1607 *byte = incremented;
1608
1609 if !overflow {
1610 return Some(data);
1611 }
1612 }
1613
1614 None
1615}
1616
1617#[cfg(test)]
1618mod tests {
1619 use crate::{
1620 file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1621 schema::parser::parse_message_type,
1622 };
1623 use core::str;
1624 use rand::distr::uniform::SampleUniform;
1625 use std::{fs::File, sync::Arc};
1626
1627 use crate::column::{
1628 page::PageReader,
1629 reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader},
1630 };
1631 use crate::file::writer::TrackedWrite;
1632 use crate::file::{
1633 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1634 };
1635 use crate::schema::types::{ColumnPath, Type as SchemaType};
1636 use crate::util::test_common::rand_gen::random_numbers_range;
1637
1638 use super::*;
1639
1640 #[test]
1641 fn test_column_writer_inconsistent_def_rep_length() {
1642 let page_writer = get_test_page_writer();
1643 let props = Default::default();
1644 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1645 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1646 assert!(res.is_err());
1647 if let Err(err) = res {
1648 assert_eq!(
1649 format!("{err}"),
1650 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1651 );
1652 }
1653 }
1654
1655 #[test]
1656 fn test_column_writer_invalid_def_levels() {
1657 let page_writer = get_test_page_writer();
1658 let props = Default::default();
1659 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1660 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1661 assert!(res.is_err());
1662 if let Err(err) = res {
1663 assert_eq!(
1664 format!("{err}"),
1665 "Parquet error: Definition levels are required, because max definition level = 1"
1666 );
1667 }
1668 }
1669
1670 #[test]
1671 fn test_column_writer_invalid_rep_levels() {
1672 let page_writer = get_test_page_writer();
1673 let props = Default::default();
1674 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1675 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1676 assert!(res.is_err());
1677 if let Err(err) = res {
1678 assert_eq!(
1679 format!("{err}"),
1680 "Parquet error: Repetition levels are required, because max repetition level = 1"
1681 );
1682 }
1683 }
1684
1685 #[test]
1686 fn test_column_writer_not_enough_values_to_write() {
1687 let page_writer = get_test_page_writer();
1688 let props = Default::default();
1689 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1690 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1691 assert!(res.is_err());
1692 if let Err(err) = res {
1693 assert_eq!(
1694 format!("{err}"),
1695 "Parquet error: Expected to write 4 values, but have only 2"
1696 );
1697 }
1698 }
1699
1700 #[test]
1701 fn test_column_writer_write_only_one_dictionary_page() {
1702 let page_writer = get_test_page_writer();
1703 let props = Default::default();
1704 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1705 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1706 writer.add_data_page().unwrap();
1708 writer.write_dictionary_page().unwrap();
1709 let err = writer.write_dictionary_page().unwrap_err().to_string();
1710 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1711 }
1712
1713 #[test]
1714 fn test_column_writer_error_when_writing_disabled_dictionary() {
1715 let page_writer = get_test_page_writer();
1716 let props = Arc::new(
1717 WriterProperties::builder()
1718 .set_dictionary_enabled(false)
1719 .build(),
1720 );
1721 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1722 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1723 let err = writer.write_dictionary_page().unwrap_err().to_string();
1724 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1725 }
1726
1727 #[test]
1728 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1729 let page_writer = get_test_page_writer();
1730 let props = Arc::new(
1731 WriterProperties::builder()
1732 .set_dictionary_enabled(true)
1733 .build(),
1734 );
1735 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1736 writer
1737 .write_batch(&[true, false, true, false], None, None)
1738 .unwrap();
1739
1740 let r = writer.close().unwrap();
1741 assert_eq!(r.bytes_written, 1);
1744 assert_eq!(r.rows_written, 4);
1745
1746 let metadata = r.metadata;
1747 assert_eq!(
1748 metadata.encodings().collect::<Vec<_>>(),
1749 vec![Encoding::PLAIN, Encoding::RLE]
1750 );
1751 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
1753 }
1754
1755 #[test]
1756 fn test_column_writer_default_encoding_support_bool() {
1757 check_encoding_write_support::<BoolType>(
1758 WriterVersion::PARQUET_1_0,
1759 true,
1760 &[true, false],
1761 None,
1762 &[Encoding::PLAIN, Encoding::RLE],
1763 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1764 );
1765 check_encoding_write_support::<BoolType>(
1766 WriterVersion::PARQUET_1_0,
1767 false,
1768 &[true, false],
1769 None,
1770 &[Encoding::PLAIN, Encoding::RLE],
1771 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1772 );
1773 check_encoding_write_support::<BoolType>(
1774 WriterVersion::PARQUET_2_0,
1775 true,
1776 &[true, false],
1777 None,
1778 &[Encoding::RLE],
1779 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1780 );
1781 check_encoding_write_support::<BoolType>(
1782 WriterVersion::PARQUET_2_0,
1783 false,
1784 &[true, false],
1785 None,
1786 &[Encoding::RLE],
1787 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1788 );
1789 }
1790
1791 #[test]
1792 fn test_column_writer_default_encoding_support_int32() {
1793 check_encoding_write_support::<Int32Type>(
1794 WriterVersion::PARQUET_1_0,
1795 true,
1796 &[1, 2],
1797 Some(0),
1798 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1799 &[
1800 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1801 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1802 ],
1803 );
1804 check_encoding_write_support::<Int32Type>(
1805 WriterVersion::PARQUET_1_0,
1806 false,
1807 &[1, 2],
1808 None,
1809 &[Encoding::PLAIN, Encoding::RLE],
1810 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1811 );
1812 check_encoding_write_support::<Int32Type>(
1813 WriterVersion::PARQUET_2_0,
1814 true,
1815 &[1, 2],
1816 Some(0),
1817 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1818 &[
1819 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1820 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1821 ],
1822 );
1823 check_encoding_write_support::<Int32Type>(
1824 WriterVersion::PARQUET_2_0,
1825 false,
1826 &[1, 2],
1827 None,
1828 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1829 &[encoding_stats(
1830 PageType::DATA_PAGE_V2,
1831 Encoding::DELTA_BINARY_PACKED,
1832 1,
1833 )],
1834 );
1835 }
1836
1837 #[test]
1838 fn test_column_writer_default_encoding_support_int64() {
1839 check_encoding_write_support::<Int64Type>(
1840 WriterVersion::PARQUET_1_0,
1841 true,
1842 &[1, 2],
1843 Some(0),
1844 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1845 &[
1846 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1847 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1848 ],
1849 );
1850 check_encoding_write_support::<Int64Type>(
1851 WriterVersion::PARQUET_1_0,
1852 false,
1853 &[1, 2],
1854 None,
1855 &[Encoding::PLAIN, Encoding::RLE],
1856 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1857 );
1858 check_encoding_write_support::<Int64Type>(
1859 WriterVersion::PARQUET_2_0,
1860 true,
1861 &[1, 2],
1862 Some(0),
1863 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1864 &[
1865 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1866 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1867 ],
1868 );
1869 check_encoding_write_support::<Int64Type>(
1870 WriterVersion::PARQUET_2_0,
1871 false,
1872 &[1, 2],
1873 None,
1874 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1875 &[encoding_stats(
1876 PageType::DATA_PAGE_V2,
1877 Encoding::DELTA_BINARY_PACKED,
1878 1,
1879 )],
1880 );
1881 }
1882
1883 #[test]
1884 fn test_column_writer_default_encoding_support_int96() {
1885 check_encoding_write_support::<Int96Type>(
1886 WriterVersion::PARQUET_1_0,
1887 true,
1888 &[Int96::from(vec![1, 2, 3])],
1889 Some(0),
1890 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1891 &[
1892 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1893 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1894 ],
1895 );
1896 check_encoding_write_support::<Int96Type>(
1897 WriterVersion::PARQUET_1_0,
1898 false,
1899 &[Int96::from(vec![1, 2, 3])],
1900 None,
1901 &[Encoding::PLAIN, Encoding::RLE],
1902 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1903 );
1904 check_encoding_write_support::<Int96Type>(
1905 WriterVersion::PARQUET_2_0,
1906 true,
1907 &[Int96::from(vec![1, 2, 3])],
1908 Some(0),
1909 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1910 &[
1911 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1912 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1913 ],
1914 );
1915 check_encoding_write_support::<Int96Type>(
1916 WriterVersion::PARQUET_2_0,
1917 false,
1918 &[Int96::from(vec![1, 2, 3])],
1919 None,
1920 &[Encoding::PLAIN, Encoding::RLE],
1921 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1922 );
1923 }
1924
1925 #[test]
1926 fn test_column_writer_default_encoding_support_float() {
1927 check_encoding_write_support::<FloatType>(
1928 WriterVersion::PARQUET_1_0,
1929 true,
1930 &[1.0, 2.0],
1931 Some(0),
1932 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1933 &[
1934 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1935 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1936 ],
1937 );
1938 check_encoding_write_support::<FloatType>(
1939 WriterVersion::PARQUET_1_0,
1940 false,
1941 &[1.0, 2.0],
1942 None,
1943 &[Encoding::PLAIN, Encoding::RLE],
1944 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1945 );
1946 check_encoding_write_support::<FloatType>(
1947 WriterVersion::PARQUET_2_0,
1948 true,
1949 &[1.0, 2.0],
1950 Some(0),
1951 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1952 &[
1953 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1954 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1955 ],
1956 );
1957 check_encoding_write_support::<FloatType>(
1958 WriterVersion::PARQUET_2_0,
1959 false,
1960 &[1.0, 2.0],
1961 None,
1962 &[Encoding::PLAIN, Encoding::RLE],
1963 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1964 );
1965 }
1966
1967 #[test]
1968 fn test_column_writer_default_encoding_support_double() {
1969 check_encoding_write_support::<DoubleType>(
1970 WriterVersion::PARQUET_1_0,
1971 true,
1972 &[1.0, 2.0],
1973 Some(0),
1974 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1975 &[
1976 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1977 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1978 ],
1979 );
1980 check_encoding_write_support::<DoubleType>(
1981 WriterVersion::PARQUET_1_0,
1982 false,
1983 &[1.0, 2.0],
1984 None,
1985 &[Encoding::PLAIN, Encoding::RLE],
1986 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1987 );
1988 check_encoding_write_support::<DoubleType>(
1989 WriterVersion::PARQUET_2_0,
1990 true,
1991 &[1.0, 2.0],
1992 Some(0),
1993 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1994 &[
1995 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1996 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1997 ],
1998 );
1999 check_encoding_write_support::<DoubleType>(
2000 WriterVersion::PARQUET_2_0,
2001 false,
2002 &[1.0, 2.0],
2003 None,
2004 &[Encoding::PLAIN, Encoding::RLE],
2005 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
2006 );
2007 }
2008
2009 #[test]
2010 fn test_column_writer_default_encoding_support_byte_array() {
2011 check_encoding_write_support::<ByteArrayType>(
2012 WriterVersion::PARQUET_1_0,
2013 true,
2014 &[ByteArray::from(vec![1u8])],
2015 Some(0),
2016 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2017 &[
2018 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2019 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2020 ],
2021 );
2022 check_encoding_write_support::<ByteArrayType>(
2023 WriterVersion::PARQUET_1_0,
2024 false,
2025 &[ByteArray::from(vec![1u8])],
2026 None,
2027 &[Encoding::PLAIN, Encoding::RLE],
2028 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2029 );
2030 check_encoding_write_support::<ByteArrayType>(
2031 WriterVersion::PARQUET_2_0,
2032 true,
2033 &[ByteArray::from(vec![1u8])],
2034 Some(0),
2035 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2036 &[
2037 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2038 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2039 ],
2040 );
2041 check_encoding_write_support::<ByteArrayType>(
2042 WriterVersion::PARQUET_2_0,
2043 false,
2044 &[ByteArray::from(vec![1u8])],
2045 None,
2046 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2047 &[encoding_stats(
2048 PageType::DATA_PAGE_V2,
2049 Encoding::DELTA_BYTE_ARRAY,
2050 1,
2051 )],
2052 );
2053 }
2054
2055 #[test]
2056 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2057 check_encoding_write_support::<FixedLenByteArrayType>(
2058 WriterVersion::PARQUET_1_0,
2059 true,
2060 &[ByteArray::from(vec![1u8]).into()],
2061 None,
2062 &[Encoding::PLAIN, Encoding::RLE],
2063 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2064 );
2065 check_encoding_write_support::<FixedLenByteArrayType>(
2066 WriterVersion::PARQUET_1_0,
2067 false,
2068 &[ByteArray::from(vec![1u8]).into()],
2069 None,
2070 &[Encoding::PLAIN, Encoding::RLE],
2071 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2072 );
2073 check_encoding_write_support::<FixedLenByteArrayType>(
2074 WriterVersion::PARQUET_2_0,
2075 true,
2076 &[ByteArray::from(vec![1u8]).into()],
2077 Some(0),
2078 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2079 &[
2080 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2081 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2082 ],
2083 );
2084 check_encoding_write_support::<FixedLenByteArrayType>(
2085 WriterVersion::PARQUET_2_0,
2086 false,
2087 &[ByteArray::from(vec![1u8]).into()],
2088 None,
2089 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2090 &[encoding_stats(
2091 PageType::DATA_PAGE_V2,
2092 Encoding::DELTA_BYTE_ARRAY,
2093 1,
2094 )],
2095 );
2096 }
2097
2098 #[test]
2099 fn test_column_writer_check_metadata() {
2100 let page_writer = get_test_page_writer();
2101 let props = Default::default();
2102 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2103 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2104
2105 let r = writer.close().unwrap();
2106 assert_eq!(r.bytes_written, 20);
2107 assert_eq!(r.rows_written, 4);
2108
2109 let metadata = r.metadata;
2110 assert_eq!(
2111 metadata.encodings().collect::<Vec<_>>(),
2112 vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2113 );
2114 assert_eq!(metadata.num_values(), 4);
2115 assert_eq!(metadata.compressed_size(), 20);
2116 assert_eq!(metadata.uncompressed_size(), 20);
2117 assert_eq!(metadata.data_page_offset(), 0);
2118 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2119 if let Some(stats) = metadata.statistics() {
2120 assert_eq!(stats.null_count_opt(), Some(0));
2121 assert_eq!(stats.distinct_count_opt(), None);
2122 if let Statistics::Int32(stats) = stats {
2123 assert_eq!(stats.min_opt().unwrap(), &1);
2124 assert_eq!(stats.max_opt().unwrap(), &4);
2125 } else {
2126 panic!("expecting Statistics::Int32");
2127 }
2128 } else {
2129 panic!("metadata missing statistics");
2130 }
2131 }
2132
2133 #[test]
2134 fn test_column_writer_check_byte_array_min_max() {
2135 let page_writer = get_test_page_writer();
2136 let props = Default::default();
2137 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2138 writer
2139 .write_batch(
2140 &[
2141 ByteArray::from(vec![
2142 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2143 35u8, 231u8, 90u8, 0u8, 0u8,
2144 ]),
2145 ByteArray::from(vec![
2146 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2147 152u8, 177u8, 56u8, 0u8, 0u8,
2148 ]),
2149 ByteArray::from(vec![
2150 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2151 0u8,
2152 ]),
2153 ByteArray::from(vec![
2154 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2155 44u8, 0u8, 0u8,
2156 ]),
2157 ],
2158 None,
2159 None,
2160 )
2161 .unwrap();
2162 let metadata = writer.close().unwrap().metadata;
2163 if let Some(stats) = metadata.statistics() {
2164 if let Statistics::ByteArray(stats) = stats {
2165 assert_eq!(
2166 stats.min_opt().unwrap(),
2167 &ByteArray::from(vec![
2168 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2169 35u8, 231u8, 90u8, 0u8, 0u8,
2170 ])
2171 );
2172 assert_eq!(
2173 stats.max_opt().unwrap(),
2174 &ByteArray::from(vec![
2175 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2176 44u8, 0u8, 0u8,
2177 ])
2178 );
2179 } else {
2180 panic!("expecting Statistics::ByteArray");
2181 }
2182 } else {
2183 panic!("metadata missing statistics");
2184 }
2185 }
2186
2187 #[test]
2188 fn test_column_writer_uint32_converted_type_min_max() {
2189 let page_writer = get_test_page_writer();
2190 let props = Default::default();
2191 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2192 page_writer,
2193 0,
2194 0,
2195 props,
2196 );
2197 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2198 let metadata = writer.close().unwrap().metadata;
2199 if let Some(stats) = metadata.statistics() {
2200 if let Statistics::Int32(stats) = stats {
2201 assert_eq!(stats.min_opt().unwrap(), &0,);
2202 assert_eq!(stats.max_opt().unwrap(), &5,);
2203 } else {
2204 panic!("expecting Statistics::Int32");
2205 }
2206 } else {
2207 panic!("metadata missing statistics");
2208 }
2209 }
2210
2211 #[test]
2212 fn test_column_writer_precalculated_statistics() {
2213 let page_writer = get_test_page_writer();
2214 let props = Arc::new(
2215 WriterProperties::builder()
2216 .set_statistics_enabled(EnabledStatistics::Chunk)
2217 .build(),
2218 );
2219 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2220 writer
2221 .write_batch_with_statistics(
2222 &[1, 2, 3, 4],
2223 None,
2224 None,
2225 Some(&-17),
2226 Some(&9000),
2227 Some(55),
2228 )
2229 .unwrap();
2230
2231 let r = writer.close().unwrap();
2232 assert_eq!(r.bytes_written, 20);
2233 assert_eq!(r.rows_written, 4);
2234
2235 let metadata = r.metadata;
2236 assert_eq!(
2237 metadata.encodings().collect::<Vec<_>>(),
2238 vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2239 );
2240 assert_eq!(metadata.num_values(), 4);
2241 assert_eq!(metadata.compressed_size(), 20);
2242 assert_eq!(metadata.uncompressed_size(), 20);
2243 assert_eq!(metadata.data_page_offset(), 0);
2244 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2245 if let Some(stats) = metadata.statistics() {
2246 assert_eq!(stats.null_count_opt(), Some(0));
2247 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2248 if let Statistics::Int32(stats) = stats {
2249 assert_eq!(stats.min_opt().unwrap(), &-17);
2250 assert_eq!(stats.max_opt().unwrap(), &9000);
2251 } else {
2252 panic!("expecting Statistics::Int32");
2253 }
2254 } else {
2255 panic!("metadata missing statistics");
2256 }
2257 }
2258
2259 #[test]
2260 fn test_mixed_precomputed_statistics() {
2261 let mut buf = Vec::with_capacity(100);
2262 let mut write = TrackedWrite::new(&mut buf);
2263 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2264 let props = Arc::new(
2265 WriterProperties::builder()
2266 .set_write_page_header_statistics(true)
2267 .build(),
2268 );
2269 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2270
2271 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2272 writer
2273 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2274 .unwrap();
2275
2276 let r = writer.close().unwrap();
2277
2278 let stats = r.metadata.statistics().unwrap();
2279 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2280 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2281 assert_eq!(stats.null_count_opt(), Some(0));
2282 assert!(stats.distinct_count_opt().is_none());
2283
2284 drop(write);
2285
2286 let props = ReaderProperties::builder()
2287 .set_backward_compatible_lz4(false)
2288 .set_read_page_statistics(true)
2289 .build();
2290 let reader = SerializedPageReader::new_with_properties(
2291 Arc::new(Bytes::from(buf)),
2292 &r.metadata,
2293 r.rows_written as usize,
2294 None,
2295 Arc::new(props),
2296 )
2297 .unwrap();
2298
2299 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2300 assert_eq!(pages.len(), 2);
2301
2302 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2303 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2304
2305 let page_statistics = pages[1].statistics().unwrap();
2306 assert_eq!(
2307 page_statistics.min_bytes_opt().unwrap(),
2308 1_i32.to_le_bytes()
2309 );
2310 assert_eq!(
2311 page_statistics.max_bytes_opt().unwrap(),
2312 7_i32.to_le_bytes()
2313 );
2314 assert_eq!(page_statistics.null_count_opt(), Some(0));
2315 assert!(page_statistics.distinct_count_opt().is_none());
2316 }
2317
2318 #[test]
2319 fn test_disabled_statistics() {
2320 let mut buf = Vec::with_capacity(100);
2321 let mut write = TrackedWrite::new(&mut buf);
2322 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2323 let props = WriterProperties::builder()
2324 .set_statistics_enabled(EnabledStatistics::None)
2325 .set_writer_version(WriterVersion::PARQUET_2_0)
2326 .build();
2327 let props = Arc::new(props);
2328
2329 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2330 writer
2331 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2332 .unwrap();
2333
2334 let r = writer.close().unwrap();
2335 assert!(r.metadata.statistics().is_none());
2336
2337 drop(write);
2338
2339 let props = ReaderProperties::builder()
2340 .set_backward_compatible_lz4(false)
2341 .build();
2342 let reader = SerializedPageReader::new_with_properties(
2343 Arc::new(Bytes::from(buf)),
2344 &r.metadata,
2345 r.rows_written as usize,
2346 None,
2347 Arc::new(props),
2348 )
2349 .unwrap();
2350
2351 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2352 assert_eq!(pages.len(), 2);
2353
2354 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2355 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2356
2357 match &pages[1] {
2358 Page::DataPageV2 {
2359 num_values,
2360 num_nulls,
2361 num_rows,
2362 statistics,
2363 ..
2364 } => {
2365 assert_eq!(*num_values, 6);
2366 assert_eq!(*num_nulls, 2);
2367 assert_eq!(*num_rows, 6);
2368 assert!(statistics.is_none());
2369 }
2370 _ => unreachable!(),
2371 }
2372 }
2373
2374 #[test]
2375 fn test_column_writer_empty_column_roundtrip() {
2376 let props = Default::default();
2377 column_roundtrip::<Int32Type>(props, &[], None, None);
2378 }
2379
2380 #[test]
2381 fn test_column_writer_non_nullable_values_roundtrip() {
2382 let props = Default::default();
2383 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2384 }
2385
2386 #[test]
2387 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2388 let props = Default::default();
2389 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2390 }
2391
2392 #[test]
2393 fn test_column_writer_nullable_repeated_values_roundtrip() {
2394 let props = Default::default();
2395 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2396 }
2397
2398 #[test]
2399 fn test_column_writer_dictionary_fallback_small_data_page() {
2400 let props = WriterProperties::builder()
2401 .set_dictionary_page_size_limit(32)
2402 .set_data_page_size_limit(32)
2403 .build();
2404 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2405 }
2406
2407 #[test]
2408 fn test_column_writer_small_write_batch_size() {
2409 for i in &[1usize, 2, 5, 10, 11, 1023] {
2410 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2411
2412 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2413 }
2414 }
2415
2416 #[test]
2417 fn test_column_writer_dictionary_disabled_v1() {
2418 let props = WriterProperties::builder()
2419 .set_writer_version(WriterVersion::PARQUET_1_0)
2420 .set_dictionary_enabled(false)
2421 .build();
2422 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2423 }
2424
2425 #[test]
2426 fn test_column_writer_dictionary_disabled_v2() {
2427 let props = WriterProperties::builder()
2428 .set_writer_version(WriterVersion::PARQUET_2_0)
2429 .set_dictionary_enabled(false)
2430 .build();
2431 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2432 }
2433
2434 #[test]
2435 fn test_column_writer_compression_v1() {
2436 let props = WriterProperties::builder()
2437 .set_writer_version(WriterVersion::PARQUET_1_0)
2438 .set_compression(Compression::SNAPPY)
2439 .build();
2440 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2441 }
2442
2443 #[test]
2444 fn test_column_writer_compression_v2() {
2445 let props = WriterProperties::builder()
2446 .set_writer_version(WriterVersion::PARQUET_2_0)
2447 .set_compression(Compression::SNAPPY)
2448 .build();
2449 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2450 }
2451
2452 #[test]
2453 fn test_column_writer_add_data_pages_with_dict() {
2454 let mut file = tempfile::tempfile().unwrap();
2457 let mut write = TrackedWrite::new(&mut file);
2458 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2459 let props = Arc::new(
2460 WriterProperties::builder()
2461 .set_data_page_size_limit(10)
2462 .set_write_batch_size(3) .build(),
2464 );
2465 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2466 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2467 writer.write_batch(data, None, None).unwrap();
2468 let r = writer.close().unwrap();
2469
2470 drop(write);
2471
2472 let props = ReaderProperties::builder()
2474 .set_backward_compatible_lz4(false)
2475 .build();
2476 let mut page_reader = Box::new(
2477 SerializedPageReader::new_with_properties(
2478 Arc::new(file),
2479 &r.metadata,
2480 r.rows_written as usize,
2481 None,
2482 Arc::new(props),
2483 )
2484 .unwrap(),
2485 );
2486 let mut res = Vec::new();
2487 while let Some(page) = page_reader.get_next_page().unwrap() {
2488 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2489 }
2490 assert_eq!(
2491 res,
2492 vec![
2493 (PageType::DICTIONARY_PAGE, 10, 40),
2494 (PageType::DATA_PAGE, 9, 10),
2495 (PageType::DATA_PAGE, 1, 3),
2496 ]
2497 );
2498 assert_eq!(
2499 r.metadata.page_encoding_stats(),
2500 Some(&vec![
2501 PageEncodingStats {
2502 page_type: PageType::DICTIONARY_PAGE,
2503 encoding: Encoding::PLAIN,
2504 count: 1
2505 },
2506 PageEncodingStats {
2507 page_type: PageType::DATA_PAGE,
2508 encoding: Encoding::RLE_DICTIONARY,
2509 count: 2,
2510 }
2511 ])
2512 );
2513 }
2514
2515 #[test]
2516 fn test_column_writer_column_data_page_size_limit() {
2517 let props = Arc::new(
2518 WriterProperties::builder()
2519 .set_writer_version(WriterVersion::PARQUET_1_0)
2520 .set_dictionary_enabled(false)
2521 .set_data_page_size_limit(1000)
2522 .set_column_data_page_size_limit(ColumnPath::from("col"), 10)
2523 .set_write_batch_size(3)
2524 .build(),
2525 );
2526 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2527
2528 let col_values =
2529 write_and_collect_page_values(ColumnPath::from("col"), Arc::clone(&props), data);
2530 let other_values = write_and_collect_page_values(ColumnPath::from("other"), props, data);
2531
2532 assert_eq!(col_values, vec![3, 3, 3, 1]);
2533 assert_eq!(other_values, vec![10]);
2534 }
2535
2536 #[test]
2537 fn test_bool_statistics() {
2538 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2539 assert!(!stats.is_min_max_backwards_compatible());
2542 if let Statistics::Boolean(stats) = stats {
2543 assert_eq!(stats.min_opt().unwrap(), &false);
2544 assert_eq!(stats.max_opt().unwrap(), &true);
2545 } else {
2546 panic!("expecting Statistics::Boolean, got {stats:?}");
2547 }
2548 }
2549
2550 #[test]
2551 fn test_int32_statistics() {
2552 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2553 assert!(stats.is_min_max_backwards_compatible());
2554 if let Statistics::Int32(stats) = stats {
2555 assert_eq!(stats.min_opt().unwrap(), &-2);
2556 assert_eq!(stats.max_opt().unwrap(), &3);
2557 } else {
2558 panic!("expecting Statistics::Int32, got {stats:?}");
2559 }
2560 }
2561
2562 #[test]
2563 fn test_int64_statistics() {
2564 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2565 assert!(stats.is_min_max_backwards_compatible());
2566 if let Statistics::Int64(stats) = stats {
2567 assert_eq!(stats.min_opt().unwrap(), &-2);
2568 assert_eq!(stats.max_opt().unwrap(), &3);
2569 } else {
2570 panic!("expecting Statistics::Int64, got {stats:?}");
2571 }
2572 }
2573
2574 #[test]
2575 fn test_int96_statistics() {
2576 let input = vec![
2577 Int96::from(vec![1, 20, 30]),
2578 Int96::from(vec![3, 20, 10]),
2579 Int96::from(vec![0, 20, 30]),
2580 Int96::from(vec![2, 20, 30]),
2581 ]
2582 .into_iter()
2583 .collect::<Vec<Int96>>();
2584
2585 let stats = statistics_roundtrip::<Int96Type>(&input);
2586 assert!(!stats.is_min_max_backwards_compatible());
2587 if let Statistics::Int96(stats) = stats {
2588 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2589 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
2590 } else {
2591 panic!("expecting Statistics::Int96, got {stats:?}");
2592 }
2593 }
2594
2595 #[test]
2596 fn test_float_statistics() {
2597 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2598 assert!(stats.is_min_max_backwards_compatible());
2599 if let Statistics::Float(stats) = stats {
2600 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2601 assert_eq!(stats.max_opt().unwrap(), &3.0);
2602 } else {
2603 panic!("expecting Statistics::Float, got {stats:?}");
2604 }
2605 }
2606
2607 #[test]
2608 fn test_double_statistics() {
2609 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2610 assert!(stats.is_min_max_backwards_compatible());
2611 if let Statistics::Double(stats) = stats {
2612 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2613 assert_eq!(stats.max_opt().unwrap(), &3.0);
2614 } else {
2615 panic!("expecting Statistics::Double, got {stats:?}");
2616 }
2617 }
2618
2619 #[test]
2620 fn test_byte_array_statistics() {
2621 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2622 .iter()
2623 .map(|&s| s.into())
2624 .collect::<Vec<_>>();
2625
2626 let stats = statistics_roundtrip::<ByteArrayType>(&input);
2627 assert!(!stats.is_min_max_backwards_compatible());
2628 if let Statistics::ByteArray(stats) = stats {
2629 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2630 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2631 } else {
2632 panic!("expecting Statistics::ByteArray, got {stats:?}");
2633 }
2634 }
2635
2636 #[test]
2637 fn test_fixed_len_byte_array_statistics() {
2638 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
2639 .iter()
2640 .map(|&s| ByteArray::from(s).into())
2641 .collect::<Vec<_>>();
2642
2643 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2644 assert!(!stats.is_min_max_backwards_compatible());
2645 if let Statistics::FixedLenByteArray(stats) = stats {
2646 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
2647 assert_eq!(stats.min_opt().unwrap(), &expected_min);
2648 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
2649 assert_eq!(stats.max_opt().unwrap(), &expected_max);
2650 } else {
2651 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2652 }
2653 }
2654
2655 #[test]
2656 fn test_column_writer_check_float16_min_max() {
2657 let input = [
2658 -f16::ONE,
2659 f16::from_f32(3.0),
2660 -f16::from_f32(2.0),
2661 f16::from_f32(2.0),
2662 ]
2663 .into_iter()
2664 .map(|s| ByteArray::from(s).into())
2665 .collect::<Vec<_>>();
2666
2667 let stats = float16_statistics_roundtrip(&input);
2668 assert!(stats.is_min_max_backwards_compatible());
2669 assert_eq!(
2670 stats.min_opt().unwrap(),
2671 &ByteArray::from(-f16::from_f32(2.0))
2672 );
2673 assert_eq!(
2674 stats.max_opt().unwrap(),
2675 &ByteArray::from(f16::from_f32(3.0))
2676 );
2677 }
2678
2679 #[test]
2680 fn test_column_writer_check_float16_nan_middle() {
2681 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2682 .into_iter()
2683 .map(|s| ByteArray::from(s).into())
2684 .collect::<Vec<_>>();
2685
2686 let stats = float16_statistics_roundtrip(&input);
2687 assert!(stats.is_min_max_backwards_compatible());
2688 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2689 assert_eq!(
2690 stats.max_opt().unwrap(),
2691 &ByteArray::from(f16::ONE + f16::ONE)
2692 );
2693 }
2694
2695 #[test]
2696 fn test_float16_statistics_nan_middle() {
2697 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2698 .into_iter()
2699 .map(|s| ByteArray::from(s).into())
2700 .collect::<Vec<_>>();
2701
2702 let stats = float16_statistics_roundtrip(&input);
2703 assert!(stats.is_min_max_backwards_compatible());
2704 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2705 assert_eq!(
2706 stats.max_opt().unwrap(),
2707 &ByteArray::from(f16::ONE + f16::ONE)
2708 );
2709 }
2710
2711 #[test]
2712 fn test_float16_statistics_nan_start() {
2713 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2714 .into_iter()
2715 .map(|s| ByteArray::from(s).into())
2716 .collect::<Vec<_>>();
2717
2718 let stats = float16_statistics_roundtrip(&input);
2719 assert!(stats.is_min_max_backwards_compatible());
2720 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2721 assert_eq!(
2722 stats.max_opt().unwrap(),
2723 &ByteArray::from(f16::ONE + f16::ONE)
2724 );
2725 }
2726
2727 #[test]
2728 fn test_float16_statistics_nan_only() {
2729 let input = [f16::NAN, f16::NAN]
2730 .into_iter()
2731 .map(|s| ByteArray::from(s).into())
2732 .collect::<Vec<_>>();
2733
2734 let stats = float16_statistics_roundtrip(&input);
2735 assert!(stats.min_bytes_opt().is_none());
2736 assert!(stats.max_bytes_opt().is_none());
2737 assert!(stats.is_min_max_backwards_compatible());
2738 }
2739
2740 #[test]
2741 fn test_float16_statistics_zero_only() {
2742 let input = [f16::ZERO]
2743 .into_iter()
2744 .map(|s| ByteArray::from(s).into())
2745 .collect::<Vec<_>>();
2746
2747 let stats = float16_statistics_roundtrip(&input);
2748 assert!(stats.is_min_max_backwards_compatible());
2749 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2750 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2751 }
2752
2753 #[test]
2754 fn test_float16_statistics_neg_zero_only() {
2755 let input = [f16::NEG_ZERO]
2756 .into_iter()
2757 .map(|s| ByteArray::from(s).into())
2758 .collect::<Vec<_>>();
2759
2760 let stats = float16_statistics_roundtrip(&input);
2761 assert!(stats.is_min_max_backwards_compatible());
2762 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2763 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2764 }
2765
2766 #[test]
2767 fn test_float16_statistics_zero_min() {
2768 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2769 .into_iter()
2770 .map(|s| ByteArray::from(s).into())
2771 .collect::<Vec<_>>();
2772
2773 let stats = float16_statistics_roundtrip(&input);
2774 assert!(stats.is_min_max_backwards_compatible());
2775 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2776 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2777 }
2778
2779 #[test]
2780 fn test_float16_statistics_neg_zero_max() {
2781 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2782 .into_iter()
2783 .map(|s| ByteArray::from(s).into())
2784 .collect::<Vec<_>>();
2785
2786 let stats = float16_statistics_roundtrip(&input);
2787 assert!(stats.is_min_max_backwards_compatible());
2788 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2789 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2790 }
2791
2792 #[test]
2793 fn test_float_statistics_nan_middle() {
2794 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2795 assert!(stats.is_min_max_backwards_compatible());
2796 if let Statistics::Float(stats) = stats {
2797 assert_eq!(stats.min_opt().unwrap(), &1.0);
2798 assert_eq!(stats.max_opt().unwrap(), &2.0);
2799 } else {
2800 panic!("expecting Statistics::Float");
2801 }
2802 }
2803
2804 #[test]
2805 fn test_float_statistics_nan_start() {
2806 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2807 assert!(stats.is_min_max_backwards_compatible());
2808 if let Statistics::Float(stats) = stats {
2809 assert_eq!(stats.min_opt().unwrap(), &1.0);
2810 assert_eq!(stats.max_opt().unwrap(), &2.0);
2811 } else {
2812 panic!("expecting Statistics::Float");
2813 }
2814 }
2815
2816 #[test]
2817 fn test_float_statistics_nan_only() {
2818 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2819 assert!(stats.min_bytes_opt().is_none());
2820 assert!(stats.max_bytes_opt().is_none());
2821 assert!(stats.is_min_max_backwards_compatible());
2822 assert!(matches!(stats, Statistics::Float(_)));
2823 }
2824
2825 #[test]
2826 fn test_float_statistics_zero_only() {
2827 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2828 assert!(stats.is_min_max_backwards_compatible());
2829 if let Statistics::Float(stats) = stats {
2830 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2831 assert!(stats.min_opt().unwrap().is_sign_negative());
2832 assert_eq!(stats.max_opt().unwrap(), &0.0);
2833 assert!(stats.max_opt().unwrap().is_sign_positive());
2834 } else {
2835 panic!("expecting Statistics::Float");
2836 }
2837 }
2838
2839 #[test]
2840 fn test_float_statistics_neg_zero_only() {
2841 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2842 assert!(stats.is_min_max_backwards_compatible());
2843 if let Statistics::Float(stats) = stats {
2844 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2845 assert!(stats.min_opt().unwrap().is_sign_negative());
2846 assert_eq!(stats.max_opt().unwrap(), &0.0);
2847 assert!(stats.max_opt().unwrap().is_sign_positive());
2848 } else {
2849 panic!("expecting Statistics::Float");
2850 }
2851 }
2852
2853 #[test]
2854 fn test_float_statistics_zero_min() {
2855 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2856 assert!(stats.is_min_max_backwards_compatible());
2857 if let Statistics::Float(stats) = stats {
2858 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2859 assert!(stats.min_opt().unwrap().is_sign_negative());
2860 assert_eq!(stats.max_opt().unwrap(), &2.0);
2861 } else {
2862 panic!("expecting Statistics::Float");
2863 }
2864 }
2865
2866 #[test]
2867 fn test_float_statistics_neg_zero_max() {
2868 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2869 assert!(stats.is_min_max_backwards_compatible());
2870 if let Statistics::Float(stats) = stats {
2871 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2872 assert_eq!(stats.max_opt().unwrap(), &0.0);
2873 assert!(stats.max_opt().unwrap().is_sign_positive());
2874 } else {
2875 panic!("expecting Statistics::Float");
2876 }
2877 }
2878
2879 #[test]
2880 fn test_double_statistics_nan_middle() {
2881 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2882 assert!(stats.is_min_max_backwards_compatible());
2883 if let Statistics::Double(stats) = stats {
2884 assert_eq!(stats.min_opt().unwrap(), &1.0);
2885 assert_eq!(stats.max_opt().unwrap(), &2.0);
2886 } else {
2887 panic!("expecting Statistics::Double");
2888 }
2889 }
2890
2891 #[test]
2892 fn test_double_statistics_nan_start() {
2893 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2894 assert!(stats.is_min_max_backwards_compatible());
2895 if let Statistics::Double(stats) = stats {
2896 assert_eq!(stats.min_opt().unwrap(), &1.0);
2897 assert_eq!(stats.max_opt().unwrap(), &2.0);
2898 } else {
2899 panic!("expecting Statistics::Double");
2900 }
2901 }
2902
2903 #[test]
2904 fn test_double_statistics_nan_only() {
2905 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2906 assert!(stats.min_bytes_opt().is_none());
2907 assert!(stats.max_bytes_opt().is_none());
2908 assert!(matches!(stats, Statistics::Double(_)));
2909 assert!(stats.is_min_max_backwards_compatible());
2910 }
2911
2912 #[test]
2913 fn test_double_statistics_zero_only() {
2914 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2915 assert!(stats.is_min_max_backwards_compatible());
2916 if let Statistics::Double(stats) = stats {
2917 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2918 assert!(stats.min_opt().unwrap().is_sign_negative());
2919 assert_eq!(stats.max_opt().unwrap(), &0.0);
2920 assert!(stats.max_opt().unwrap().is_sign_positive());
2921 } else {
2922 panic!("expecting Statistics::Double");
2923 }
2924 }
2925
2926 #[test]
2927 fn test_double_statistics_neg_zero_only() {
2928 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2929 assert!(stats.is_min_max_backwards_compatible());
2930 if let Statistics::Double(stats) = stats {
2931 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2932 assert!(stats.min_opt().unwrap().is_sign_negative());
2933 assert_eq!(stats.max_opt().unwrap(), &0.0);
2934 assert!(stats.max_opt().unwrap().is_sign_positive());
2935 } else {
2936 panic!("expecting Statistics::Double");
2937 }
2938 }
2939
2940 #[test]
2941 fn test_double_statistics_zero_min() {
2942 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2943 assert!(stats.is_min_max_backwards_compatible());
2944 if let Statistics::Double(stats) = stats {
2945 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2946 assert!(stats.min_opt().unwrap().is_sign_negative());
2947 assert_eq!(stats.max_opt().unwrap(), &2.0);
2948 } else {
2949 panic!("expecting Statistics::Double");
2950 }
2951 }
2952
2953 #[test]
2954 fn test_double_statistics_neg_zero_max() {
2955 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2956 assert!(stats.is_min_max_backwards_compatible());
2957 if let Statistics::Double(stats) = stats {
2958 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2959 assert_eq!(stats.max_opt().unwrap(), &0.0);
2960 assert!(stats.max_opt().unwrap().is_sign_positive());
2961 } else {
2962 panic!("expecting Statistics::Double");
2963 }
2964 }
2965
2966 #[test]
2967 fn test_compare_greater_byte_array_decimals() {
2968 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2969 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2970 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2971 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2972 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2973 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2974 assert!(!compare_greater_byte_array_decimals(
2975 &[0u8, 1u8,],
2976 &[1u8, 0u8,],
2977 ),);
2978 assert!(!compare_greater_byte_array_decimals(
2979 &[255u8, 35u8, 0u8, 0u8,],
2980 &[0u8,],
2981 ),);
2982 assert!(compare_greater_byte_array_decimals(
2983 &[0u8,],
2984 &[255u8, 35u8, 0u8, 0u8,],
2985 ),);
2986 }
2987
2988 #[test]
2989 fn test_column_index_with_null_pages() {
2990 let page_writer = get_test_page_writer();
2992 let props = Default::default();
2993 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2994 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2995
2996 let r = writer.close().unwrap();
2997 assert!(r.column_index.is_some());
2998 let col_idx = r.column_index.unwrap();
2999 let col_idx = match col_idx {
3000 ColumnIndexMetaData::INT32(col_idx) => col_idx,
3001 _ => panic!("wrong stats type"),
3002 };
3003 assert!(col_idx.is_null_page(0));
3005 assert!(col_idx.min_value(0).is_none());
3007 assert!(col_idx.max_value(0).is_none());
3008 assert!(col_idx.null_count(0).is_some());
3010 assert_eq!(col_idx.null_count(0), Some(4));
3011 assert!(col_idx.repetition_level_histogram(0).is_none());
3013 assert!(col_idx.definition_level_histogram(0).is_some());
3015 assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
3016 }
3017
3018 #[test]
3019 fn test_column_offset_index_metadata() {
3020 let page_writer = get_test_page_writer();
3023 let props = Default::default();
3024 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3025 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3026 writer.flush_data_pages().unwrap();
3028 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
3030
3031 let r = writer.close().unwrap();
3032 let column_index = r.column_index.unwrap();
3033 let offset_index = r.offset_index.unwrap();
3034
3035 assert_eq!(8, r.rows_written);
3036
3037 let column_index = match column_index {
3039 ColumnIndexMetaData::INT32(column_index) => column_index,
3040 _ => panic!("wrong stats type"),
3041 };
3042 assert_eq!(2, column_index.num_pages());
3043 assert_eq!(2, offset_index.page_locations.len());
3044 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
3045 for idx in 0..2 {
3046 assert!(!column_index.is_null_page(idx));
3047 assert_eq!(0, column_index.null_count(0).unwrap());
3048 }
3049
3050 if let Some(stats) = r.metadata.statistics() {
3051 assert_eq!(stats.null_count_opt(), Some(0));
3052 assert_eq!(stats.distinct_count_opt(), None);
3053 if let Statistics::Int32(stats) = stats {
3054 assert_eq!(stats.min_opt(), column_index.min_value(1));
3058 assert_eq!(stats.max_opt(), column_index.max_value(1));
3059 } else {
3060 panic!("expecting Statistics::Int32");
3061 }
3062 } else {
3063 panic!("metadata missing statistics");
3064 }
3065
3066 assert_eq!(0, offset_index.page_locations[0].first_row_index);
3068 assert_eq!(4, offset_index.page_locations[1].first_row_index);
3069 }
3070
3071 #[test]
3073 fn test_column_offset_index_metadata_truncating() {
3074 let page_writer = get_test_page_writer();
3077 let props = WriterProperties::builder()
3078 .set_statistics_truncate_length(None) .build()
3080 .into();
3081 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3082
3083 let mut data = vec![FixedLenByteArray::default(); 3];
3084 data[0].set_data(Bytes::from(vec![97_u8; 200]));
3086 data[1].set_data(Bytes::from(vec![112_u8; 200]));
3088 data[2].set_data(Bytes::from(vec![98_u8; 200]));
3089
3090 writer.write_batch(&data, None, None).unwrap();
3091
3092 writer.flush_data_pages().unwrap();
3093
3094 let r = writer.close().unwrap();
3095 let column_index = r.column_index.unwrap();
3096 let offset_index = r.offset_index.unwrap();
3097
3098 let column_index = match column_index {
3099 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3100 _ => panic!("wrong stats type"),
3101 };
3102
3103 assert_eq!(3, r.rows_written);
3104
3105 assert_eq!(1, column_index.num_pages());
3107 assert_eq!(1, offset_index.page_locations.len());
3108 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3109 assert!(!column_index.is_null_page(0));
3110 assert_eq!(Some(0), column_index.null_count(0));
3111
3112 if let Some(stats) = r.metadata.statistics() {
3113 assert_eq!(stats.null_count_opt(), Some(0));
3114 assert_eq!(stats.distinct_count_opt(), None);
3115 if let Statistics::FixedLenByteArray(stats) = stats {
3116 let column_index_min_value = column_index.min_value(0).unwrap();
3117 let column_index_max_value = column_index.max_value(0).unwrap();
3118
3119 assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value);
3121 assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value);
3122
3123 assert_eq!(
3124 column_index_min_value.len(),
3125 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3126 );
3127 assert_eq!(column_index_min_value, &[97_u8; 64]);
3128 assert_eq!(
3129 column_index_max_value.len(),
3130 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3131 );
3132
3133 assert_eq!(
3135 *column_index_max_value.last().unwrap(),
3136 *column_index_max_value.first().unwrap() + 1
3137 );
3138 } else {
3139 panic!("expecting Statistics::FixedLenByteArray");
3140 }
3141 } else {
3142 panic!("metadata missing statistics");
3143 }
3144 }
3145
3146 #[test]
3147 fn test_column_offset_index_truncating_spec_example() {
3148 let page_writer = get_test_page_writer();
3151
3152 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3154 let props = Arc::new(builder.build());
3155 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3156
3157 let mut data = vec![FixedLenByteArray::default(); 1];
3158 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3160
3161 writer.write_batch(&data, None, None).unwrap();
3162
3163 writer.flush_data_pages().unwrap();
3164
3165 let r = writer.close().unwrap();
3166 let column_index = r.column_index.unwrap();
3167 let offset_index = r.offset_index.unwrap();
3168
3169 let column_index = match column_index {
3170 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3171 _ => panic!("wrong stats type"),
3172 };
3173
3174 assert_eq!(1, r.rows_written);
3175
3176 assert_eq!(1, column_index.num_pages());
3178 assert_eq!(1, offset_index.page_locations.len());
3179 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3180 assert!(!column_index.is_null_page(0));
3181 assert_eq!(Some(0), column_index.null_count(0));
3182
3183 if let Some(stats) = r.metadata.statistics() {
3184 assert_eq!(stats.null_count_opt(), Some(0));
3185 assert_eq!(stats.distinct_count_opt(), None);
3186 if let Statistics::FixedLenByteArray(_stats) = stats {
3187 let column_index_min_value = column_index.min_value(0).unwrap();
3188 let column_index_max_value = column_index.max_value(0).unwrap();
3189
3190 assert_eq!(column_index_min_value.len(), 1);
3191 assert_eq!(column_index_max_value.len(), 1);
3192
3193 assert_eq!("B".as_bytes(), column_index_min_value);
3194 assert_eq!("C".as_bytes(), column_index_max_value);
3195
3196 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3197 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3198 } else {
3199 panic!("expecting Statistics::FixedLenByteArray");
3200 }
3201 } else {
3202 panic!("metadata missing statistics");
3203 }
3204 }
3205
3206 #[test]
3207 fn test_float16_min_max_no_truncation() {
3208 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3210 let props = Arc::new(builder.build());
3211 let page_writer = get_test_page_writer();
3212 let mut writer = get_test_float16_column_writer(page_writer, props);
3213
3214 let expected_value = f16::PI.to_le_bytes().to_vec();
3215 let data = vec![ByteArray::from(expected_value.clone()).into()];
3216 writer.write_batch(&data, None, None).unwrap();
3217 writer.flush_data_pages().unwrap();
3218
3219 let r = writer.close().unwrap();
3220
3221 let column_index = r.column_index.unwrap();
3224 let column_index = match column_index {
3225 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3226 _ => panic!("wrong stats type"),
3227 };
3228 let column_index_min_bytes = column_index.min_value(0).unwrap();
3229 let column_index_max_bytes = column_index.max_value(0).unwrap();
3230 assert_eq!(expected_value, column_index_min_bytes);
3231 assert_eq!(expected_value, column_index_max_bytes);
3232
3233 let stats = r.metadata.statistics().unwrap();
3235 if let Statistics::FixedLenByteArray(stats) = stats {
3236 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3237 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3238 assert_eq!(expected_value, stats_min_bytes);
3239 assert_eq!(expected_value, stats_max_bytes);
3240 } else {
3241 panic!("expecting Statistics::FixedLenByteArray");
3242 }
3243 }
3244
3245 #[test]
3246 fn test_decimal_min_max_no_truncation() {
3247 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3249 let props = Arc::new(builder.build());
3250 let page_writer = get_test_page_writer();
3251 let mut writer =
3252 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3253
3254 let expected_value = vec![
3255 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3256 231u8, 90u8, 0u8, 0u8,
3257 ];
3258 let data = vec![ByteArray::from(expected_value.clone()).into()];
3259 writer.write_batch(&data, None, None).unwrap();
3260 writer.flush_data_pages().unwrap();
3261
3262 let r = writer.close().unwrap();
3263
3264 let column_index = r.column_index.unwrap();
3267 let column_index = match column_index {
3268 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3269 _ => panic!("wrong stats type"),
3270 };
3271 let column_index_min_bytes = column_index.min_value(0).unwrap();
3272 let column_index_max_bytes = column_index.max_value(0).unwrap();
3273 assert_eq!(expected_value, column_index_min_bytes);
3274 assert_eq!(expected_value, column_index_max_bytes);
3275
3276 let stats = r.metadata.statistics().unwrap();
3278 if let Statistics::FixedLenByteArray(stats) = stats {
3279 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3280 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3281 assert_eq!(expected_value, stats_min_bytes);
3282 assert_eq!(expected_value, stats_max_bytes);
3283 } else {
3284 panic!("expecting Statistics::FixedLenByteArray");
3285 }
3286 }
3287
3288 #[test]
3289 fn test_statistics_truncating_byte_array_default() {
3290 let page_writer = get_test_page_writer();
3291
3292 let props = WriterProperties::builder().build().into();
3294 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3295
3296 let mut data = vec![ByteArray::default(); 1];
3297 data[0].set_data(Bytes::from(String::from(
3298 "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3299 )));
3300 writer.write_batch(&data, None, None).unwrap();
3301 writer.flush_data_pages().unwrap();
3302
3303 let r = writer.close().unwrap();
3304
3305 assert_eq!(1, r.rows_written);
3306
3307 let stats = r.metadata.statistics().expect("statistics");
3308 if let Statistics::ByteArray(_stats) = stats {
3309 let min_value = _stats.min_opt().unwrap();
3310 let max_value = _stats.max_opt().unwrap();
3311
3312 assert!(!_stats.min_is_exact());
3313 assert!(!_stats.max_is_exact());
3314
3315 let expected_len = 64;
3316 assert_eq!(min_value.len(), expected_len);
3317 assert_eq!(max_value.len(), expected_len);
3318
3319 let expected_min =
3320 "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3321 assert_eq!(expected_min, min_value.as_bytes());
3322 let expected_max =
3324 "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3325 assert_eq!(expected_max, max_value.as_bytes());
3326 } else {
3327 panic!("expecting Statistics::ByteArray");
3328 }
3329 }
3330
3331 #[test]
3332 fn test_statistics_truncating_byte_array() {
3333 let page_writer = get_test_page_writer();
3334
3335 const TEST_TRUNCATE_LENGTH: usize = 1;
3336
3337 let builder =
3339 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3340 let props = Arc::new(builder.build());
3341 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3342
3343 let mut data = vec![ByteArray::default(); 1];
3344 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3346
3347 writer.write_batch(&data, None, None).unwrap();
3348
3349 writer.flush_data_pages().unwrap();
3350
3351 let r = writer.close().unwrap();
3352
3353 assert_eq!(1, r.rows_written);
3354
3355 let stats = r.metadata.statistics().expect("statistics");
3356 assert_eq!(stats.null_count_opt(), Some(0));
3357 assert_eq!(stats.distinct_count_opt(), None);
3358 if let Statistics::ByteArray(_stats) = stats {
3359 let min_value = _stats.min_opt().unwrap();
3360 let max_value = _stats.max_opt().unwrap();
3361
3362 assert!(!_stats.min_is_exact());
3363 assert!(!_stats.max_is_exact());
3364
3365 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3366 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3367
3368 assert_eq!("B".as_bytes(), min_value.as_bytes());
3369 assert_eq!("C".as_bytes(), max_value.as_bytes());
3370 } else {
3371 panic!("expecting Statistics::ByteArray");
3372 }
3373 }
3374
3375 #[test]
3376 fn test_statistics_truncating_fixed_len_byte_array() {
3377 let page_writer = get_test_page_writer();
3378
3379 const TEST_TRUNCATE_LENGTH: usize = 1;
3380
3381 let builder =
3383 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3384 let props = Arc::new(builder.build());
3385 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3386
3387 let mut data = vec![FixedLenByteArray::default(); 1];
3388
3389 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3390 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3391
3392 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3394 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3395
3396 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3398
3399 writer.write_batch(&data, None, None).unwrap();
3400
3401 writer.flush_data_pages().unwrap();
3402
3403 let r = writer.close().unwrap();
3404
3405 assert_eq!(1, r.rows_written);
3406
3407 let stats = r.metadata.statistics().expect("statistics");
3408 assert_eq!(stats.null_count_opt(), Some(0));
3409 assert_eq!(stats.distinct_count_opt(), None);
3410 if let Statistics::FixedLenByteArray(_stats) = stats {
3411 let min_value = _stats.min_opt().unwrap();
3412 let max_value = _stats.max_opt().unwrap();
3413
3414 assert!(!_stats.min_is_exact());
3415 assert!(!_stats.max_is_exact());
3416
3417 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3418 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3419
3420 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3421 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3422
3423 let reconstructed_min = i128::from_be_bytes([
3424 min_value.as_bytes()[0],
3425 0,
3426 0,
3427 0,
3428 0,
3429 0,
3430 0,
3431 0,
3432 0,
3433 0,
3434 0,
3435 0,
3436 0,
3437 0,
3438 0,
3439 0,
3440 ]);
3441
3442 let reconstructed_max = i128::from_be_bytes([
3443 max_value.as_bytes()[0],
3444 0,
3445 0,
3446 0,
3447 0,
3448 0,
3449 0,
3450 0,
3451 0,
3452 0,
3453 0,
3454 0,
3455 0,
3456 0,
3457 0,
3458 0,
3459 ]);
3460
3461 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3463 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3464 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3465 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3466 } else {
3467 panic!("expecting Statistics::FixedLenByteArray");
3468 }
3469 }
3470
3471 #[test]
3472 fn test_send() {
3473 fn test<T: Send>() {}
3474 test::<ColumnWriterImpl<Int32Type>>();
3475 }
3476
3477 #[test]
3478 fn test_increment() {
3479 let v = increment(vec![0, 0, 0]).unwrap();
3480 assert_eq!(&v, &[0, 0, 1]);
3481
3482 let v = increment(vec![0, 255, 255]).unwrap();
3484 assert_eq!(&v, &[1, 0, 0]);
3485
3486 let v = increment(vec![255, 255, 255]);
3488 assert!(v.is_none());
3489 }
3490
3491 #[test]
3492 fn test_increment_utf8() {
3493 let test_inc = |o: &str, expected: &str| {
3494 if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3495 assert_eq!(v, expected);
3497 assert!(*v > *o);
3499 let mut greater = ByteArray::new();
3501 greater.set_data(Bytes::from(v));
3502 let mut original = ByteArray::new();
3503 original.set_data(Bytes::from(o.as_bytes().to_vec()));
3504 assert!(greater > original);
3505 } else {
3506 panic!("Expected incremented UTF8 string to also be valid.");
3507 }
3508 };
3509
3510 test_inc("hello", "hellp");
3512
3513 test_inc("a\u{7f}", "b");
3515
3516 assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3518
3519 test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3521
3522 test_inc("éééé", "éééê");
3524
3525 test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3527
3528 test_inc("a\u{7ff}", "b");
3530
3531 assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3533
3534 test_inc("ࠀࠀ", "ࠀࠁ");
3537
3538 test_inc("a\u{ffff}", "b");
3540
3541 assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3543
3544 test_inc("𐀀𐀀", "𐀀𐀁");
3546
3547 test_inc("a\u{10ffff}", "b");
3549
3550 assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3552
3553 test_inc("a\u{D7FF}", "b");
3556 }
3557
3558 #[test]
3559 fn test_truncate_utf8() {
3560 let data = "❤️🧡💛💚💙💜";
3562 let r = truncate_utf8(data, data.len()).unwrap();
3563 assert_eq!(r.len(), data.len());
3564 assert_eq!(&r, data.as_bytes());
3565
3566 let r = truncate_utf8(data, 13).unwrap();
3568 assert_eq!(r.len(), 10);
3569 assert_eq!(&r, "❤️🧡".as_bytes());
3570
3571 let r = truncate_utf8("\u{0836}", 1);
3573 assert!(r.is_none());
3574
3575 let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3578 assert_eq!(&r, "yyyyyyyz".as_bytes());
3579
3580 let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3582 assert_eq!(&r, "ééê".as_bytes());
3583
3584 let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3586 assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3587
3588 let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3590 assert!(r.is_none());
3591
3592 let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3595 assert_eq!(&r, "ࠀࠁ".as_bytes());
3596
3597 let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3599 assert!(r.is_none());
3600
3601 let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3603 assert_eq!(&r, "𐀀𐀁".as_bytes());
3604
3605 let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3607 assert!(r.is_none());
3608 }
3609
3610 #[test]
3611 fn test_byte_array_truncate_invalid_utf8_statistics() {
3614 let message_type = "
3615 message test_schema {
3616 OPTIONAL BYTE_ARRAY a (UTF8);
3617 }
3618 ";
3619 let schema = Arc::new(parse_message_type(message_type).unwrap());
3620
3621 let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3623 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3624 let file: File = tempfile::tempfile().unwrap();
3625 let props = Arc::new(
3626 WriterProperties::builder()
3627 .set_statistics_enabled(EnabledStatistics::Chunk)
3628 .set_statistics_truncate_length(Some(8))
3629 .build(),
3630 );
3631
3632 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3633 let mut row_group_writer = writer.next_row_group().unwrap();
3634
3635 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3636 col_writer
3637 .typed::<ByteArrayType>()
3638 .write_batch(&data, Some(&def_levels), None)
3639 .unwrap();
3640 col_writer.close().unwrap();
3641 row_group_writer.close().unwrap();
3642 let file_metadata = writer.close().unwrap();
3643 let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
3644 assert!(!stats.max_is_exact());
3645 assert_eq!(
3648 stats.max_bytes_opt().map(|v| v.to_vec()),
3649 Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3650 );
3651 }
3652
3653 #[test]
3654 fn test_increment_max_binary_chars() {
3655 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3656 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3657
3658 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3659 assert!(incremented.is_none())
3660 }
3661
3662 #[test]
3663 fn test_no_column_index_when_stats_disabled() {
3664 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3668 let props = Arc::new(
3669 WriterProperties::builder()
3670 .set_statistics_enabled(EnabledStatistics::None)
3671 .build(),
3672 );
3673 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3674 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3675
3676 let data = Vec::new();
3677 let def_levels = vec![0; 10];
3678 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3679 writer.flush_data_pages().unwrap();
3680
3681 let column_close_result = writer.close().unwrap();
3682 assert!(column_close_result.offset_index.is_some());
3683 assert!(column_close_result.column_index.is_none());
3684 }
3685
3686 #[test]
3687 fn test_no_offset_index_when_disabled() {
3688 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3690 let props = Arc::new(
3691 WriterProperties::builder()
3692 .set_statistics_enabled(EnabledStatistics::None)
3693 .set_offset_index_disabled(true)
3694 .build(),
3695 );
3696 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3697 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3698
3699 let data = Vec::new();
3700 let def_levels = vec![0; 10];
3701 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3702 writer.flush_data_pages().unwrap();
3703
3704 let column_close_result = writer.close().unwrap();
3705 assert!(column_close_result.offset_index.is_none());
3706 assert!(column_close_result.column_index.is_none());
3707 }
3708
3709 #[test]
3710 fn test_offset_index_overridden() {
3711 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3713 let props = Arc::new(
3714 WriterProperties::builder()
3715 .set_statistics_enabled(EnabledStatistics::Page)
3716 .set_offset_index_disabled(true)
3717 .build(),
3718 );
3719 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3720 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3721
3722 let data = Vec::new();
3723 let def_levels = vec![0; 10];
3724 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3725 writer.flush_data_pages().unwrap();
3726
3727 let column_close_result = writer.close().unwrap();
3728 assert!(column_close_result.offset_index.is_some());
3729 assert!(column_close_result.column_index.is_some());
3730 }
3731
3732 #[test]
3733 fn test_boundary_order() -> Result<()> {
3734 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3735 let column_close_result = write_multiple_pages::<Int32Type>(
3737 &descr,
3738 &[
3739 &[Some(-10), Some(10)],
3740 &[Some(-5), Some(11)],
3741 &[None],
3742 &[Some(-5), Some(11)],
3743 ],
3744 )?;
3745 let boundary_order = column_close_result
3746 .column_index
3747 .unwrap()
3748 .get_boundary_order();
3749 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3750
3751 let column_close_result = write_multiple_pages::<Int32Type>(
3753 &descr,
3754 &[
3755 &[Some(10), Some(11)],
3756 &[Some(5), Some(11)],
3757 &[None],
3758 &[Some(-5), Some(0)],
3759 ],
3760 )?;
3761 let boundary_order = column_close_result
3762 .column_index
3763 .unwrap()
3764 .get_boundary_order();
3765 assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3766
3767 let column_close_result = write_multiple_pages::<Int32Type>(
3769 &descr,
3770 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3771 )?;
3772 let boundary_order = column_close_result
3773 .column_index
3774 .unwrap()
3775 .get_boundary_order();
3776 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3777
3778 let column_close_result =
3780 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3781 let boundary_order = column_close_result
3782 .column_index
3783 .unwrap()
3784 .get_boundary_order();
3785 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3786
3787 let column_close_result =
3789 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3790 let boundary_order = column_close_result
3791 .column_index
3792 .unwrap()
3793 .get_boundary_order();
3794 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3795
3796 let column_close_result =
3798 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3799 let boundary_order = column_close_result
3800 .column_index
3801 .unwrap()
3802 .get_boundary_order();
3803 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3804
3805 let column_close_result = write_multiple_pages::<Int32Type>(
3807 &descr,
3808 &[
3809 &[Some(10), Some(11)],
3810 &[Some(11), Some(16)],
3811 &[None],
3812 &[Some(-5), Some(0)],
3813 ],
3814 )?;
3815 let boundary_order = column_close_result
3816 .column_index
3817 .unwrap()
3818 .get_boundary_order();
3819 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3820
3821 let column_close_result = write_multiple_pages::<Int32Type>(
3823 &descr,
3824 &[
3825 &[Some(1), Some(9)],
3826 &[Some(2), Some(8)],
3827 &[None],
3828 &[Some(3), Some(7)],
3829 ],
3830 )?;
3831 let boundary_order = column_close_result
3832 .column_index
3833 .unwrap()
3834 .get_boundary_order();
3835 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3836
3837 Ok(())
3838 }
3839
3840 #[test]
3841 fn test_boundary_order_logical_type() -> Result<()> {
3842 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3845 let fba_descr = {
3846 let tpe = SchemaType::primitive_type_builder(
3847 "col",
3848 FixedLenByteArrayType::get_physical_type(),
3849 )
3850 .with_length(2)
3851 .build()?;
3852 Arc::new(ColumnDescriptor::new(
3853 Arc::new(tpe),
3854 1,
3855 0,
3856 ColumnPath::from("col"),
3857 ))
3858 };
3859
3860 let values: &[&[Option<FixedLenByteArray>]] = &[
3861 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3862 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3863 &[Some(FixedLenByteArray::from(ByteArray::from(
3864 f16::NEG_ZERO,
3865 )))],
3866 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3867 ];
3868
3869 let column_close_result =
3871 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3872 let boundary_order = column_close_result
3873 .column_index
3874 .unwrap()
3875 .get_boundary_order();
3876 assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3877
3878 let column_close_result =
3880 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3881 let boundary_order = column_close_result
3882 .column_index
3883 .unwrap()
3884 .get_boundary_order();
3885 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3886
3887 Ok(())
3888 }
3889
3890 #[test]
3891 fn test_interval_stats_should_not_have_min_max() {
3892 let input = [
3893 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3894 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3895 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3896 ]
3897 .into_iter()
3898 .map(|s| ByteArray::from(s).into())
3899 .collect::<Vec<_>>();
3900
3901 let page_writer = get_test_page_writer();
3902 let mut writer = get_test_interval_column_writer(page_writer);
3903 writer.write_batch(&input, None, None).unwrap();
3904
3905 let metadata = writer.close().unwrap().metadata;
3906 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3907 stats.clone()
3908 } else {
3909 panic!("metadata missing statistics");
3910 };
3911 assert!(stats.min_bytes_opt().is_none());
3912 assert!(stats.max_bytes_opt().is_none());
3913 }
3914
3915 #[test]
3916 #[cfg(feature = "arrow")]
3917 fn test_column_writer_get_estimated_total_bytes() {
3918 let page_writer = get_test_page_writer();
3919 let props = Default::default();
3920 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3921 assert_eq!(writer.get_estimated_total_bytes(), 0);
3922
3923 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3924 writer.add_data_page().unwrap();
3925 let size_with_one_page = writer.get_estimated_total_bytes();
3926 assert_eq!(size_with_one_page, 20);
3927
3928 writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3929 writer.add_data_page().unwrap();
3930 let size_with_two_pages = writer.get_estimated_total_bytes();
3931 assert_eq!(size_with_two_pages, 20 + 21);
3933 }
3934
3935 fn write_multiple_pages<T: DataType>(
3936 column_descr: &Arc<ColumnDescriptor>,
3937 pages: &[&[Option<T::T>]],
3938 ) -> Result<ColumnCloseResult> {
3939 let column_writer = get_column_writer(
3940 column_descr.clone(),
3941 Default::default(),
3942 get_test_page_writer(),
3943 );
3944 let mut writer = get_typed_column_writer::<T>(column_writer);
3945
3946 for &page in pages {
3947 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3948 let def_levels = page
3949 .iter()
3950 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3951 .collect::<Vec<_>>();
3952 writer.write_batch(&values, Some(&def_levels), None)?;
3953 writer.flush_data_pages()?;
3954 }
3955
3956 writer.close()
3957 }
3958
3959 fn column_roundtrip_random<T: DataType>(
3963 props: WriterProperties,
3964 max_size: usize,
3965 min_value: T::T,
3966 max_value: T::T,
3967 max_def_level: i16,
3968 max_rep_level: i16,
3969 ) where
3970 T::T: PartialOrd + SampleUniform + Copy,
3971 {
3972 let mut num_values: usize = 0;
3973
3974 let mut buf: Vec<i16> = Vec::new();
3975 let def_levels = if max_def_level > 0 {
3976 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3977 for &dl in &buf[..] {
3978 if dl == max_def_level {
3979 num_values += 1;
3980 }
3981 }
3982 Some(&buf[..])
3983 } else {
3984 num_values = max_size;
3985 None
3986 };
3987
3988 let mut buf: Vec<i16> = Vec::new();
3989 let rep_levels = if max_rep_level > 0 {
3990 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3991 buf[0] = 0; Some(&buf[..])
3993 } else {
3994 None
3995 };
3996
3997 let mut values: Vec<T::T> = Vec::new();
3998 random_numbers_range(num_values, min_value, max_value, &mut values);
3999
4000 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
4001 }
4002
4003 fn column_roundtrip<T: DataType>(
4005 props: WriterProperties,
4006 values: &[T::T],
4007 def_levels: Option<&[i16]>,
4008 rep_levels: Option<&[i16]>,
4009 ) {
4010 let mut file = tempfile::tempfile().unwrap();
4011 let mut write = TrackedWrite::new(&mut file);
4012 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4013
4014 let max_def_level = match def_levels {
4015 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4016 None => 0i16,
4017 };
4018
4019 let max_rep_level = match rep_levels {
4020 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4021 None => 0i16,
4022 };
4023
4024 let mut max_batch_size = values.len();
4025 if let Some(levels) = def_levels {
4026 max_batch_size = max_batch_size.max(levels.len());
4027 }
4028 if let Some(levels) = rep_levels {
4029 max_batch_size = max_batch_size.max(levels.len());
4030 }
4031
4032 let mut writer =
4033 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4034
4035 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
4036 assert_eq!(values_written, values.len());
4037 let result = writer.close().unwrap();
4038
4039 drop(write);
4040
4041 let props = ReaderProperties::builder()
4042 .set_backward_compatible_lz4(false)
4043 .build();
4044 let page_reader = Box::new(
4045 SerializedPageReader::new_with_properties(
4046 Arc::new(file),
4047 &result.metadata,
4048 result.rows_written as usize,
4049 None,
4050 Arc::new(props),
4051 )
4052 .unwrap(),
4053 );
4054 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
4055
4056 let mut actual_values = Vec::with_capacity(max_batch_size);
4057 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
4058 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
4059
4060 let (_, values_read, levels_read) = reader
4061 .read_records(
4062 max_batch_size,
4063 actual_def_levels.as_mut(),
4064 actual_rep_levels.as_mut(),
4065 &mut actual_values,
4066 )
4067 .unwrap();
4068
4069 assert_eq!(&actual_values[..values_read], values);
4072 match actual_def_levels {
4073 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
4074 None => assert_eq!(None, def_levels),
4075 }
4076 match actual_rep_levels {
4077 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
4078 None => assert_eq!(None, rep_levels),
4079 }
4080
4081 if let Some(levels) = actual_rep_levels {
4084 let mut actual_rows_written = 0;
4085 for l in levels {
4086 if l == 0 {
4087 actual_rows_written += 1;
4088 }
4089 }
4090 assert_eq!(actual_rows_written, result.rows_written);
4091 } else if actual_def_levels.is_some() {
4092 assert_eq!(levels_read as u64, result.rows_written);
4093 } else {
4094 assert_eq!(values_read as u64, result.rows_written);
4095 }
4096 }
4097
4098 fn column_write_and_get_metadata<T: DataType>(
4101 props: WriterProperties,
4102 values: &[T::T],
4103 ) -> ColumnChunkMetaData {
4104 let page_writer = get_test_page_writer();
4105 let props = Arc::new(props);
4106 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4107 writer.write_batch(values, None, None).unwrap();
4108 writer.close().unwrap().metadata
4109 }
4110
4111 fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4113 PageEncodingStats {
4114 page_type,
4115 encoding,
4116 count,
4117 }
4118 }
4119
4120 fn check_encoding_write_support<T: DataType>(
4124 version: WriterVersion,
4125 dict_enabled: bool,
4126 data: &[T::T],
4127 dictionary_page_offset: Option<i64>,
4128 encodings: &[Encoding],
4129 page_encoding_stats: &[PageEncodingStats],
4130 ) {
4131 let props = WriterProperties::builder()
4132 .set_writer_version(version)
4133 .set_dictionary_enabled(dict_enabled)
4134 .build();
4135 let meta = column_write_and_get_metadata::<T>(props, data);
4136 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4137 assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
4138 assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4139 }
4140
4141 fn get_test_column_writer<'a, T: DataType>(
4143 page_writer: Box<dyn PageWriter + 'a>,
4144 max_def_level: i16,
4145 max_rep_level: i16,
4146 props: WriterPropertiesPtr,
4147 ) -> ColumnWriterImpl<'a, T> {
4148 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4149 let column_writer = get_column_writer(descr, props, page_writer);
4150 get_typed_column_writer::<T>(column_writer)
4151 }
4152
4153 fn get_test_column_writer_with_path<'a, T: DataType>(
4154 page_writer: Box<dyn PageWriter + 'a>,
4155 max_def_level: i16,
4156 max_rep_level: i16,
4157 props: WriterPropertiesPtr,
4158 path: ColumnPath,
4159 ) -> ColumnWriterImpl<'a, T> {
4160 let descr = Arc::new(get_test_column_descr_with_path::<T>(
4161 max_def_level,
4162 max_rep_level,
4163 path,
4164 ));
4165 let column_writer = get_column_writer(descr, props, page_writer);
4166 get_typed_column_writer::<T>(column_writer)
4167 }
4168
4169 fn get_test_column_reader<T: DataType>(
4171 page_reader: Box<dyn PageReader>,
4172 max_def_level: i16,
4173 max_rep_level: i16,
4174 ) -> ColumnReaderImpl<T> {
4175 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4176 let column_reader = get_column_reader(descr, page_reader);
4177 get_typed_column_reader::<T>(column_reader)
4178 }
4179
4180 fn get_test_column_descr<T: DataType>(
4182 max_def_level: i16,
4183 max_rep_level: i16,
4184 ) -> ColumnDescriptor {
4185 let path = ColumnPath::from("col");
4186 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4187 .with_length(1)
4190 .build()
4191 .unwrap();
4192 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4193 }
4194
4195 fn get_test_column_descr_with_path<T: DataType>(
4196 max_def_level: i16,
4197 max_rep_level: i16,
4198 path: ColumnPath,
4199 ) -> ColumnDescriptor {
4200 let name = path.string();
4201 let tpe = SchemaType::primitive_type_builder(&name, T::get_physical_type())
4202 .with_length(1)
4205 .build()
4206 .unwrap();
4207 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4208 }
4209
4210 fn write_and_collect_page_values(
4211 path: ColumnPath,
4212 props: WriterPropertiesPtr,
4213 data: &[i32],
4214 ) -> Vec<u32> {
4215 let mut file = tempfile::tempfile().unwrap();
4216 let mut write = TrackedWrite::new(&mut file);
4217 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4218 let mut writer =
4219 get_test_column_writer_with_path::<Int32Type>(page_writer, 0, 0, props, path);
4220 writer.write_batch(data, None, None).unwrap();
4221 let r = writer.close().unwrap();
4222
4223 drop(write);
4224
4225 let props = ReaderProperties::builder()
4226 .set_backward_compatible_lz4(false)
4227 .build();
4228 let mut page_reader = Box::new(
4229 SerializedPageReader::new_with_properties(
4230 Arc::new(file),
4231 &r.metadata,
4232 r.rows_written as usize,
4233 None,
4234 Arc::new(props),
4235 )
4236 .unwrap(),
4237 );
4238
4239 let mut values_per_page = Vec::new();
4240 while let Some(page) = page_reader.get_next_page().unwrap() {
4241 assert_eq!(page.page_type(), PageType::DATA_PAGE);
4242 values_per_page.push(page.num_values());
4243 }
4244
4245 values_per_page
4246 }
4247
4248 fn get_test_page_writer() -> Box<dyn PageWriter> {
4250 Box::new(TestPageWriter {})
4251 }
4252
4253 struct TestPageWriter {}
4254
4255 impl PageWriter for TestPageWriter {
4256 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4257 let mut res = PageWriteSpec::new();
4258 res.page_type = page.page_type();
4259 res.uncompressed_size = page.uncompressed_size();
4260 res.compressed_size = page.compressed_size();
4261 res.num_values = page.num_values();
4262 res.offset = 0;
4263 res.bytes_written = page.data().len() as u64;
4264 Ok(res)
4265 }
4266
4267 fn close(&mut self) -> Result<()> {
4268 Ok(())
4269 }
4270 }
4271
4272 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4274 let page_writer = get_test_page_writer();
4275 let props = Default::default();
4276 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4277 writer.write_batch(values, None, None).unwrap();
4278
4279 let metadata = writer.close().unwrap().metadata;
4280 if let Some(stats) = metadata.statistics() {
4281 stats.clone()
4282 } else {
4283 panic!("metadata missing statistics");
4284 }
4285 }
4286
4287 fn get_test_decimals_column_writer<T: DataType>(
4289 page_writer: Box<dyn PageWriter>,
4290 max_def_level: i16,
4291 max_rep_level: i16,
4292 props: WriterPropertiesPtr,
4293 ) -> ColumnWriterImpl<'static, T> {
4294 let descr = Arc::new(get_test_decimals_column_descr::<T>(
4295 max_def_level,
4296 max_rep_level,
4297 ));
4298 let column_writer = get_column_writer(descr, props, page_writer);
4299 get_typed_column_writer::<T>(column_writer)
4300 }
4301
4302 fn get_test_decimals_column_descr<T: DataType>(
4304 max_def_level: i16,
4305 max_rep_level: i16,
4306 ) -> ColumnDescriptor {
4307 let path = ColumnPath::from("col");
4308 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4309 .with_length(16)
4310 .with_logical_type(Some(LogicalType::Decimal {
4311 scale: 2,
4312 precision: 3,
4313 }))
4314 .with_scale(2)
4315 .with_precision(3)
4316 .build()
4317 .unwrap();
4318 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4319 }
4320
4321 fn float16_statistics_roundtrip(
4322 values: &[FixedLenByteArray],
4323 ) -> ValueStatistics<FixedLenByteArray> {
4324 let page_writer = get_test_page_writer();
4325 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4326 writer.write_batch(values, None, None).unwrap();
4327
4328 let metadata = writer.close().unwrap().metadata;
4329 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4330 stats.clone()
4331 } else {
4332 panic!("metadata missing statistics");
4333 }
4334 }
4335
4336 fn get_test_float16_column_writer(
4337 page_writer: Box<dyn PageWriter>,
4338 props: WriterPropertiesPtr,
4339 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4340 let descr = Arc::new(get_test_float16_column_descr(0, 0));
4341 let column_writer = get_column_writer(descr, props, page_writer);
4342 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4343 }
4344
4345 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4346 let path = ColumnPath::from("col");
4347 let tpe =
4348 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4349 .with_length(2)
4350 .with_logical_type(Some(LogicalType::Float16))
4351 .build()
4352 .unwrap();
4353 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4354 }
4355
4356 fn get_test_interval_column_writer(
4357 page_writer: Box<dyn PageWriter>,
4358 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4359 let descr = Arc::new(get_test_interval_column_descr());
4360 let column_writer = get_column_writer(descr, Default::default(), page_writer);
4361 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4362 }
4363
4364 fn get_test_interval_column_descr() -> ColumnDescriptor {
4365 let path = ColumnPath::from("col");
4366 let tpe =
4367 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4368 .with_length(12)
4369 .with_converted_type(ConvertedType::INTERVAL)
4370 .build()
4371 .unwrap();
4372 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4373 }
4374
4375 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4377 page_writer: Box<dyn PageWriter + 'a>,
4378 max_def_level: i16,
4379 max_rep_level: i16,
4380 props: WriterPropertiesPtr,
4381 ) -> ColumnWriterImpl<'a, T> {
4382 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4383 max_def_level,
4384 max_rep_level,
4385 ));
4386 let column_writer = get_column_writer(descr, props, page_writer);
4387 get_typed_column_writer::<T>(column_writer)
4388 }
4389
4390 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4392 max_def_level: i16,
4393 max_rep_level: i16,
4394 ) -> ColumnDescriptor {
4395 let path = ColumnPath::from("col");
4396 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4397 .with_converted_type(ConvertedType::UINT_32)
4398 .build()
4399 .unwrap();
4400 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4401 }
4402
4403 #[test]
4404 fn test_page_v2_snappy_compression_fallback() {
4405 let page_writer = TestPageWriter {};
4407
4408 let props = WriterProperties::builder()
4410 .set_writer_version(WriterVersion::PARQUET_2_0)
4411 .set_dictionary_enabled(false)
4413 .set_compression(Compression::SNAPPY)
4414 .build();
4415
4416 let mut column_writer =
4417 get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
4418
4419 let values = vec![ByteArray::from("a")];
4422
4423 column_writer.write_batch(&values, None, None).unwrap();
4424
4425 let result = column_writer.close().unwrap();
4426 assert_eq!(
4427 result.metadata.uncompressed_size(),
4428 result.metadata.compressed_size()
4429 );
4430 }
4431}