1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::file::page_index::column_index::ColumnIndexMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use std::collections::{BTreeSet, VecDeque};
27use std::str;
28
29use crate::basic::{
30 BoundaryOrder, Compression, ConvertedType, Encoding, LogicalType, PageType, Type,
31};
32use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
33use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
34use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
35use crate::data_type::private::ParquetValueType;
36use crate::data_type::*;
37use crate::encodings::levels::LevelEncoder;
38#[cfg(feature = "encryption")]
39use crate::encryption::encrypt::get_column_crypto_metadata;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{
42 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
43 OffsetIndexBuilder, PageEncodingStats,
44};
45use crate::file::properties::{
46 EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
47};
48use crate::file::statistics::{Statistics, ValueStatistics};
49use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
50
51pub(crate) mod encoder;
52
53macro_rules! downcast_writer {
54 ($e:expr, $i:ident, $b:expr) => {
55 match $e {
56 Self::BoolColumnWriter($i) => $b,
57 Self::Int32ColumnWriter($i) => $b,
58 Self::Int64ColumnWriter($i) => $b,
59 Self::Int96ColumnWriter($i) => $b,
60 Self::FloatColumnWriter($i) => $b,
61 Self::DoubleColumnWriter($i) => $b,
62 Self::ByteArrayColumnWriter($i) => $b,
63 Self::FixedLenByteArrayColumnWriter($i) => $b,
64 }
65 };
66}
67
68pub enum ColumnWriter<'a> {
72 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
74 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
76 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
78 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
80 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
82 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
84 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
86 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
88}
89
90impl ColumnWriter<'_> {
91 #[cfg(feature = "arrow")]
93 pub(crate) fn memory_size(&self) -> usize {
94 downcast_writer!(self, typed, typed.memory_size())
95 }
96
97 #[cfg(feature = "arrow")]
99 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
100 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
101 }
102
103 pub fn close(self) -> Result<ColumnCloseResult> {
105 downcast_writer!(self, typed, typed.close())
106 }
107}
108
109pub fn get_column_writer<'a>(
111 descr: ColumnDescPtr,
112 props: WriterPropertiesPtr,
113 page_writer: Box<dyn PageWriter + 'a>,
114) -> ColumnWriter<'a> {
115 match descr.physical_type() {
116 Type::BOOLEAN => {
117 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
118 }
119 Type::INT32 => {
120 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
121 }
122 Type::INT64 => {
123 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
124 }
125 Type::INT96 => {
126 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127 }
128 Type::FLOAT => {
129 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130 }
131 Type::DOUBLE => {
132 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133 }
134 Type::BYTE_ARRAY => {
135 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136 }
137 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
138 ColumnWriterImpl::new(descr, props, page_writer),
139 ),
140 }
141}
142
143pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
148 T::get_column_writer(col_writer).unwrap_or_else(|| {
149 panic!(
150 "Failed to convert column writer into a typed column writer for `{}` type",
151 T::get_physical_type()
152 )
153 })
154}
155
156pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
158 col_writer: &'b ColumnWriter<'a>,
159) -> &'b ColumnWriterImpl<'a, T> {
160 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
161 panic!(
162 "Failed to convert column writer into a typed column writer for `{}` type",
163 T::get_physical_type()
164 )
165 })
166}
167
168pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
170 col_writer: &'a mut ColumnWriter<'b>,
171) -> &'a mut ColumnWriterImpl<'b, T> {
172 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
173 panic!(
174 "Failed to convert column writer into a typed column writer for `{}` type",
175 T::get_physical_type()
176 )
177 })
178}
179
180#[derive(Debug, Clone)]
184pub struct ColumnCloseResult {
185 pub bytes_written: u64,
187 pub rows_written: u64,
189 pub metadata: ColumnChunkMetaData,
191 pub bloom_filter: Option<Sbbf>,
193 pub column_index: Option<ColumnIndexMetaData>,
195 pub offset_index: Option<OffsetIndexMetaData>,
197}
198
199#[derive(Default)]
201struct PageMetrics {
202 num_buffered_values: u32,
203 num_buffered_rows: u32,
204 num_page_nulls: u64,
205 repetition_level_histogram: Option<LevelHistogram>,
206 definition_level_histogram: Option<LevelHistogram>,
207}
208
209impl PageMetrics {
210 fn new() -> Self {
211 Default::default()
212 }
213
214 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
216 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
217 self
218 }
219
220 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
222 self.definition_level_histogram = LevelHistogram::try_new(max_level);
223 self
224 }
225
226 fn new_page(&mut self) {
229 self.num_buffered_values = 0;
230 self.num_buffered_rows = 0;
231 self.num_page_nulls = 0;
232 self.repetition_level_histogram
233 .as_mut()
234 .map(LevelHistogram::reset);
235 self.definition_level_histogram
236 .as_mut()
237 .map(LevelHistogram::reset);
238 }
239
240 fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
242 if let Some(ref mut rep_hist) = self.repetition_level_histogram {
243 rep_hist.update_from_levels(levels);
244 }
245 }
246
247 fn update_definition_level_histogram(&mut self, levels: &[i16]) {
249 if let Some(ref mut def_hist) = self.definition_level_histogram {
250 def_hist.update_from_levels(levels);
251 }
252 }
253}
254
255#[derive(Default)]
257struct ColumnMetrics<T: Default> {
258 total_bytes_written: u64,
259 total_rows_written: u64,
260 total_uncompressed_size: u64,
261 total_compressed_size: u64,
262 total_num_values: u64,
263 dictionary_page_offset: Option<u64>,
264 data_page_offset: Option<u64>,
265 min_column_value: Option<T>,
266 max_column_value: Option<T>,
267 num_column_nulls: u64,
268 column_distinct_count: Option<u64>,
269 variable_length_bytes: Option<i64>,
270 repetition_level_histogram: Option<LevelHistogram>,
271 definition_level_histogram: Option<LevelHistogram>,
272}
273
274impl<T: Default> ColumnMetrics<T> {
275 fn new() -> Self {
276 Default::default()
277 }
278
279 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
281 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
282 self
283 }
284
285 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
287 self.definition_level_histogram = LevelHistogram::try_new(max_level);
288 self
289 }
290
291 fn update_histogram(
293 chunk_histogram: &mut Option<LevelHistogram>,
294 page_histogram: &Option<LevelHistogram>,
295 ) {
296 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
297 chunk_hist.add(page_hist);
298 }
299 }
300
301 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
304 ColumnMetrics::<T>::update_histogram(
305 &mut self.definition_level_histogram,
306 &page_metrics.definition_level_histogram,
307 );
308 ColumnMetrics::<T>::update_histogram(
309 &mut self.repetition_level_histogram,
310 &page_metrics.repetition_level_histogram,
311 );
312 }
313
314 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
316 if let Some(var_bytes) = variable_length_bytes {
317 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
318 }
319 }
320}
321
322pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
324
325pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
327 descr: ColumnDescPtr,
329 props: WriterPropertiesPtr,
330 statistics_enabled: EnabledStatistics,
331
332 page_writer: Box<dyn PageWriter + 'a>,
333 codec: Compression,
334 compressor: Option<Box<dyn Codec>>,
335 encoder: E,
336
337 page_metrics: PageMetrics,
338 column_metrics: ColumnMetrics<E::T>,
340
341 encodings: BTreeSet<Encoding>,
344 encoding_stats: Vec<PageEncodingStats>,
345 def_levels_sink: Vec<i16>,
347 rep_levels_sink: Vec<i16>,
348 data_pages: VecDeque<CompressedPage>,
349 column_index_builder: ColumnIndexBuilder,
351 offset_index_builder: Option<OffsetIndexBuilder>,
352
353 data_page_boundary_ascending: bool,
356 data_page_boundary_descending: bool,
357 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
359}
360
361impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
362 pub fn new(
364 descr: ColumnDescPtr,
365 props: WriterPropertiesPtr,
366 page_writer: Box<dyn PageWriter + 'a>,
367 ) -> Self {
368 let codec = props.compression(descr.path());
369 let codec_options = CodecOptionsBuilder::default().build();
370 let compressor = create_codec(codec, &codec_options).unwrap();
371 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
372
373 let statistics_enabled = props.statistics_enabled(descr.path());
374
375 let mut encodings = BTreeSet::new();
376 encodings.insert(Encoding::RLE);
378
379 let mut page_metrics = PageMetrics::new();
380 let mut column_metrics = ColumnMetrics::<E::T>::new();
381
382 if statistics_enabled != EnabledStatistics::None {
384 page_metrics = page_metrics
385 .with_repetition_level_histogram(descr.max_rep_level())
386 .with_definition_level_histogram(descr.max_def_level());
387 column_metrics = column_metrics
388 .with_repetition_level_histogram(descr.max_rep_level())
389 .with_definition_level_histogram(descr.max_def_level())
390 }
391
392 let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type());
394 if statistics_enabled != EnabledStatistics::Page {
395 column_index_builder.to_invalid()
396 }
397
398 let offset_index_builder = match props.offset_index_disabled() {
400 false => Some(OffsetIndexBuilder::new()),
401 _ => None,
402 };
403
404 Self {
405 descr,
406 props,
407 statistics_enabled,
408 page_writer,
409 codec,
410 compressor,
411 encoder,
412 def_levels_sink: vec![],
413 rep_levels_sink: vec![],
414 data_pages: VecDeque::new(),
415 page_metrics,
416 column_metrics,
417 column_index_builder,
418 offset_index_builder,
419 encodings,
420 encoding_stats: vec![],
421 data_page_boundary_ascending: true,
422 data_page_boundary_descending: true,
423 last_non_null_data_page_min_max: None,
424 }
425 }
426
427 #[allow(clippy::too_many_arguments)]
428 pub(crate) fn write_batch_internal(
429 &mut self,
430 values: &E::Values,
431 value_indices: Option<&[usize]>,
432 def_levels: Option<&[i16]>,
433 rep_levels: Option<&[i16]>,
434 min: Option<&E::T>,
435 max: Option<&E::T>,
436 distinct_count: Option<u64>,
437 ) -> Result<usize> {
438 if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
440 if def.len() != rep.len() {
441 return Err(general_err!(
442 "Inconsistent length of definition and repetition levels: {} != {}",
443 def.len(),
444 rep.len()
445 ));
446 }
447 }
448
449 let num_levels = match def_levels {
460 Some(def_levels) => def_levels.len(),
461 None => values.len(),
462 };
463
464 if let Some(min) = min {
465 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
466 }
467 if let Some(max) = max {
468 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
469 }
470
471 if self.encoder.num_values() == 0 {
473 self.column_metrics.column_distinct_count = distinct_count;
474 } else {
475 self.column_metrics.column_distinct_count = None;
476 }
477
478 let mut values_offset = 0;
479 let mut levels_offset = 0;
480 let base_batch_size = self.props.write_batch_size();
481 while levels_offset < num_levels {
482 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
483
484 if let Some(r) = rep_levels {
486 while end_offset < r.len() && r[end_offset] != 0 {
487 end_offset += 1;
488 }
489 }
490
491 values_offset += self.write_mini_batch(
492 values,
493 values_offset,
494 value_indices,
495 end_offset - levels_offset,
496 def_levels.map(|lv| &lv[levels_offset..end_offset]),
497 rep_levels.map(|lv| &lv[levels_offset..end_offset]),
498 )?;
499 levels_offset = end_offset;
500 }
501
502 Ok(values_offset)
504 }
505
506 pub fn write_batch(
519 &mut self,
520 values: &E::Values,
521 def_levels: Option<&[i16]>,
522 rep_levels: Option<&[i16]>,
523 ) -> Result<usize> {
524 self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
525 }
526
527 pub fn write_batch_with_statistics(
535 &mut self,
536 values: &E::Values,
537 def_levels: Option<&[i16]>,
538 rep_levels: Option<&[i16]>,
539 min: Option<&E::T>,
540 max: Option<&E::T>,
541 distinct_count: Option<u64>,
542 ) -> Result<usize> {
543 self.write_batch_internal(
544 values,
545 None,
546 def_levels,
547 rep_levels,
548 min,
549 max,
550 distinct_count,
551 )
552 }
553
554 #[cfg(feature = "arrow")]
559 pub(crate) fn memory_size(&self) -> usize {
560 self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
561 }
562
563 pub fn get_total_bytes_written(&self) -> u64 {
569 self.column_metrics.total_bytes_written
570 }
571
572 #[cfg(feature = "arrow")]
578 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
579 self.data_pages
580 .iter()
581 .map(|page| page.data().len() as u64)
582 .sum::<u64>()
583 + self.column_metrics.total_bytes_written
584 + self.encoder.estimated_data_page_size() as u64
585 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
586 }
587
588 pub fn get_total_rows_written(&self) -> u64 {
591 self.column_metrics.total_rows_written
592 }
593
594 pub fn get_descriptor(&self) -> &ColumnDescPtr {
596 &self.descr
597 }
598
599 pub fn close(mut self) -> Result<ColumnCloseResult> {
602 if self.page_metrics.num_buffered_values > 0 {
603 self.add_data_page()?;
604 }
605 if self.encoder.has_dictionary() {
606 self.write_dictionary_page()?;
607 }
608 self.flush_data_pages()?;
609 let metadata = self.build_column_metadata()?;
610 self.page_writer.close()?;
611
612 let boundary_order = match (
613 self.data_page_boundary_ascending,
614 self.data_page_boundary_descending,
615 ) {
616 (true, _) => BoundaryOrder::ASCENDING,
619 (false, true) => BoundaryOrder::DESCENDING,
620 (false, false) => BoundaryOrder::UNORDERED,
621 };
622 self.column_index_builder.set_boundary_order(boundary_order);
623
624 let column_index = match self.column_index_builder.valid() {
625 true => Some(self.column_index_builder.build()?),
626 false => None,
627 };
628
629 let offset_index = self.offset_index_builder.map(|b| b.build());
630
631 Ok(ColumnCloseResult {
632 bytes_written: self.column_metrics.total_bytes_written,
633 rows_written: self.column_metrics.total_rows_written,
634 bloom_filter: self.encoder.flush_bloom_filter(),
635 metadata,
636 column_index,
637 offset_index,
638 })
639 }
640
641 fn write_mini_batch(
645 &mut self,
646 values: &E::Values,
647 values_offset: usize,
648 value_indices: Option<&[usize]>,
649 num_levels: usize,
650 def_levels: Option<&[i16]>,
651 rep_levels: Option<&[i16]>,
652 ) -> Result<usize> {
653 let values_to_write = if self.descr.max_def_level() > 0 {
655 let levels = def_levels.ok_or_else(|| {
656 general_err!(
657 "Definition levels are required, because max definition level = {}",
658 self.descr.max_def_level()
659 )
660 })?;
661
662 let values_to_write = levels
663 .iter()
664 .map(|level| (*level == self.descr.max_def_level()) as usize)
665 .sum();
666 self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
667
668 self.page_metrics.update_definition_level_histogram(levels);
670
671 self.def_levels_sink.extend_from_slice(levels);
672 values_to_write
673 } else {
674 num_levels
675 };
676
677 if self.descr.max_rep_level() > 0 {
679 let levels = rep_levels.ok_or_else(|| {
681 general_err!(
682 "Repetition levels are required, because max repetition level = {}",
683 self.descr.max_rep_level()
684 )
685 })?;
686
687 if !levels.is_empty() && levels[0] != 0 {
688 return Err(general_err!(
689 "Write must start at a record boundary, got non-zero repetition level of {}",
690 levels[0]
691 ));
692 }
693
694 for &level in levels {
696 self.page_metrics.num_buffered_rows += (level == 0) as u32
697 }
698
699 self.page_metrics.update_repetition_level_histogram(levels);
701
702 self.rep_levels_sink.extend_from_slice(levels);
703 } else {
704 self.page_metrics.num_buffered_rows += num_levels as u32;
707 }
708
709 match value_indices {
710 Some(indices) => {
711 let indices = &indices[values_offset..values_offset + values_to_write];
712 self.encoder.write_gather(values, indices)?;
713 }
714 None => self.encoder.write(values, values_offset, values_to_write)?,
715 }
716
717 self.page_metrics.num_buffered_values += num_levels as u32;
718
719 if self.should_add_data_page() {
720 self.add_data_page()?;
721 }
722
723 if self.should_dict_fallback() {
724 self.dict_fallback()?;
725 }
726
727 Ok(values_to_write)
728 }
729
730 #[inline]
735 fn should_dict_fallback(&self) -> bool {
736 match self.encoder.estimated_dict_page_size() {
737 Some(size) => {
738 size >= self
739 .props
740 .column_dictionary_page_size_limit(self.descr.path())
741 }
742 None => false,
743 }
744 }
745
746 #[inline]
748 fn should_add_data_page(&self) -> bool {
749 if self.page_metrics.num_buffered_values == 0 {
754 return false;
755 }
756
757 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
758 || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
759 }
760
761 fn dict_fallback(&mut self) -> Result<()> {
764 if self.page_metrics.num_buffered_values > 0 {
766 self.add_data_page()?;
767 }
768 self.write_dictionary_page()?;
769 self.flush_data_pages()?;
770 Ok(())
771 }
772
773 fn update_column_offset_index(
775 &mut self,
776 page_statistics: Option<&ValueStatistics<E::T>>,
777 page_variable_length_bytes: Option<i64>,
778 ) {
779 let null_page =
781 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
782 if null_page && self.column_index_builder.valid() {
785 self.column_index_builder.append(
786 null_page,
787 vec![],
788 vec![],
789 self.page_metrics.num_page_nulls as i64,
790 );
791 } else if self.column_index_builder.valid() {
792 match &page_statistics {
795 None => {
796 self.column_index_builder.to_invalid();
797 }
798 Some(stat) => {
799 let new_min = stat.min_opt().unwrap();
801 let new_max = stat.max_opt().unwrap();
802 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
803 if self.data_page_boundary_ascending {
804 let not_ascending = compare_greater(&self.descr, last_min, new_min)
806 || compare_greater(&self.descr, last_max, new_max);
807 if not_ascending {
808 self.data_page_boundary_ascending = false;
809 }
810 }
811
812 if self.data_page_boundary_descending {
813 let not_descending = compare_greater(&self.descr, new_min, last_min)
815 || compare_greater(&self.descr, new_max, last_max);
816 if not_descending {
817 self.data_page_boundary_descending = false;
818 }
819 }
820 }
821 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
822
823 if self.can_truncate_value() {
824 self.column_index_builder.append(
825 null_page,
826 self.truncate_min_value(
827 self.props.column_index_truncate_length(),
828 stat.min_bytes_opt().unwrap(),
829 )
830 .0,
831 self.truncate_max_value(
832 self.props.column_index_truncate_length(),
833 stat.max_bytes_opt().unwrap(),
834 )
835 .0,
836 self.page_metrics.num_page_nulls as i64,
837 );
838 } else {
839 self.column_index_builder.append(
840 null_page,
841 stat.min_bytes_opt().unwrap().to_vec(),
842 stat.max_bytes_opt().unwrap().to_vec(),
843 self.page_metrics.num_page_nulls as i64,
844 );
845 }
846 }
847 }
848 }
849
850 self.column_index_builder.append_histograms(
852 &self.page_metrics.repetition_level_histogram,
853 &self.page_metrics.definition_level_histogram,
854 );
855
856 if let Some(builder) = self.offset_index_builder.as_mut() {
858 builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
859 builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
860 }
861 }
862
863 fn can_truncate_value(&self) -> bool {
865 match self.descr.physical_type() {
866 Type::FIXED_LEN_BYTE_ARRAY
870 if !matches!(
871 self.descr.logical_type(),
872 Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
873 ) =>
874 {
875 true
876 }
877 Type::BYTE_ARRAY => true,
878 _ => false,
880 }
881 }
882
883 fn is_utf8(&self) -> bool {
885 self.get_descriptor().logical_type() == Some(LogicalType::String)
886 || self.get_descriptor().converted_type() == ConvertedType::UTF8
887 }
888
889 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
900 truncation_length
901 .filter(|l| data.len() > *l)
902 .and_then(|l|
903 if self.is_utf8() {
905 match str::from_utf8(data) {
906 Ok(str_data) => truncate_utf8(str_data, l),
907 Err(_) => Some(data[..l].to_vec()),
908 }
909 } else {
910 Some(data[..l].to_vec())
911 }
912 )
913 .map(|truncated| (truncated, true))
914 .unwrap_or_else(|| (data.to_vec(), false))
915 }
916
917 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
931 truncation_length
932 .filter(|l| data.len() > *l)
933 .and_then(|l|
934 if self.is_utf8() {
936 match str::from_utf8(data) {
937 Ok(str_data) => truncate_and_increment_utf8(str_data, l),
938 Err(_) => increment(data[..l].to_vec()),
939 }
940 } else {
941 increment(data[..l].to_vec())
942 }
943 )
944 .map(|truncated| (truncated, true))
945 .unwrap_or_else(|| (data.to_vec(), false))
946 }
947
948 fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
951 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
952 match statistics {
953 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
954 let (min, did_truncate_min) = self.truncate_min_value(
955 self.props.statistics_truncate_length(),
956 stats.min_bytes_opt().unwrap(),
957 );
958 let (max, did_truncate_max) = self.truncate_max_value(
959 self.props.statistics_truncate_length(),
960 stats.max_bytes_opt().unwrap(),
961 );
962 Statistics::ByteArray(
963 ValueStatistics::new(
964 Some(min.into()),
965 Some(max.into()),
966 stats.distinct_count(),
967 stats.null_count_opt(),
968 backwards_compatible_min_max,
969 )
970 .with_max_is_exact(!did_truncate_max)
971 .with_min_is_exact(!did_truncate_min),
972 )
973 }
974 Statistics::FixedLenByteArray(stats)
975 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
976 {
977 let (min, did_truncate_min) = self.truncate_min_value(
978 self.props.statistics_truncate_length(),
979 stats.min_bytes_opt().unwrap(),
980 );
981 let (max, did_truncate_max) = self.truncate_max_value(
982 self.props.statistics_truncate_length(),
983 stats.max_bytes_opt().unwrap(),
984 );
985 Statistics::FixedLenByteArray(
986 ValueStatistics::new(
987 Some(min.into()),
988 Some(max.into()),
989 stats.distinct_count(),
990 stats.null_count_opt(),
991 backwards_compatible_min_max,
992 )
993 .with_max_is_exact(!did_truncate_max)
994 .with_min_is_exact(!did_truncate_min),
995 )
996 }
997 stats => stats,
998 }
999 }
1000
1001 fn add_data_page(&mut self) -> Result<()> {
1004 let values_data = self.encoder.flush_data_page()?;
1006
1007 let max_def_level = self.descr.max_def_level();
1008 let max_rep_level = self.descr.max_rep_level();
1009
1010 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1011
1012 let page_statistics = match (values_data.min_value, values_data.max_value) {
1013 (Some(min), Some(max)) => {
1014 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1016 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1017
1018 (self.statistics_enabled == EnabledStatistics::Page).then_some(
1019 ValueStatistics::new(
1020 Some(min),
1021 Some(max),
1022 None,
1023 Some(self.page_metrics.num_page_nulls),
1024 false,
1025 ),
1026 )
1027 }
1028 _ => None,
1029 };
1030
1031 self.update_column_offset_index(
1033 page_statistics.as_ref(),
1034 values_data.variable_length_bytes,
1035 );
1036
1037 self.column_metrics
1039 .update_from_page_metrics(&self.page_metrics);
1040 self.column_metrics
1041 .update_variable_length_bytes(values_data.variable_length_bytes);
1042
1043 let page_statistics = page_statistics
1045 .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1046 .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1047
1048 let compressed_page = match self.props.writer_version() {
1049 WriterVersion::PARQUET_1_0 => {
1050 let mut buffer = vec![];
1051
1052 if max_rep_level > 0 {
1053 buffer.extend_from_slice(
1054 &self.encode_levels_v1(
1055 Encoding::RLE,
1056 &self.rep_levels_sink[..],
1057 max_rep_level,
1058 )[..],
1059 );
1060 }
1061
1062 if max_def_level > 0 {
1063 buffer.extend_from_slice(
1064 &self.encode_levels_v1(
1065 Encoding::RLE,
1066 &self.def_levels_sink[..],
1067 max_def_level,
1068 )[..],
1069 );
1070 }
1071
1072 buffer.extend_from_slice(&values_data.buf);
1073 let uncompressed_size = buffer.len();
1074
1075 if let Some(ref mut cmpr) = self.compressor {
1076 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1077 cmpr.compress(&buffer[..], &mut compressed_buf)?;
1078 compressed_buf.shrink_to_fit();
1079 buffer = compressed_buf;
1080 }
1081
1082 let data_page = Page::DataPage {
1083 buf: buffer.into(),
1084 num_values: self.page_metrics.num_buffered_values,
1085 encoding: values_data.encoding,
1086 def_level_encoding: Encoding::RLE,
1087 rep_level_encoding: Encoding::RLE,
1088 statistics: page_statistics,
1089 };
1090
1091 CompressedPage::new(data_page, uncompressed_size)
1092 }
1093 WriterVersion::PARQUET_2_0 => {
1094 let mut rep_levels_byte_len = 0;
1095 let mut def_levels_byte_len = 0;
1096 let mut buffer = vec![];
1097
1098 if max_rep_level > 0 {
1099 let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1100 rep_levels_byte_len = levels.len();
1101 buffer.extend_from_slice(&levels[..]);
1102 }
1103
1104 if max_def_level > 0 {
1105 let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1106 def_levels_byte_len = levels.len();
1107 buffer.extend_from_slice(&levels[..]);
1108 }
1109
1110 let uncompressed_size =
1111 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1112
1113 let is_compressed = match self.compressor {
1115 Some(ref mut cmpr) => {
1116 let buffer_len = buffer.len();
1117 cmpr.compress(&values_data.buf, &mut buffer)?;
1118 if uncompressed_size <= buffer.len() - buffer_len {
1119 buffer.truncate(buffer_len);
1120 buffer.extend_from_slice(&values_data.buf);
1121 false
1122 } else {
1123 true
1124 }
1125 }
1126 None => {
1127 buffer.extend_from_slice(&values_data.buf);
1128 false
1129 }
1130 };
1131
1132 let data_page = Page::DataPageV2 {
1133 buf: buffer.into(),
1134 num_values: self.page_metrics.num_buffered_values,
1135 encoding: values_data.encoding,
1136 num_nulls: self.page_metrics.num_page_nulls as u32,
1137 num_rows: self.page_metrics.num_buffered_rows,
1138 def_levels_byte_len: def_levels_byte_len as u32,
1139 rep_levels_byte_len: rep_levels_byte_len as u32,
1140 is_compressed,
1141 statistics: page_statistics,
1142 };
1143
1144 CompressedPage::new(data_page, uncompressed_size)
1145 }
1146 };
1147
1148 if self.encoder.has_dictionary() {
1150 self.data_pages.push_back(compressed_page);
1151 } else {
1152 self.write_data_page(compressed_page)?;
1153 }
1154
1155 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1157
1158 self.rep_levels_sink.clear();
1160 self.def_levels_sink.clear();
1161 self.page_metrics.new_page();
1162
1163 Ok(())
1164 }
1165
1166 #[inline]
1169 fn flush_data_pages(&mut self) -> Result<()> {
1170 if self.page_metrics.num_buffered_values > 0 {
1172 self.add_data_page()?;
1173 }
1174
1175 while let Some(page) = self.data_pages.pop_front() {
1176 self.write_data_page(page)?;
1177 }
1178
1179 Ok(())
1180 }
1181
1182 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1184 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1185 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1186 let num_values = self.column_metrics.total_num_values as i64;
1187 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1188 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1190
1191 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1192 .set_compression(self.codec)
1193 .set_encodings(self.encodings.iter().cloned().collect())
1194 .set_page_encoding_stats(self.encoding_stats.clone())
1195 .set_total_compressed_size(total_compressed_size)
1196 .set_total_uncompressed_size(total_uncompressed_size)
1197 .set_num_values(num_values)
1198 .set_data_page_offset(data_page_offset)
1199 .set_dictionary_page_offset(dict_page_offset);
1200
1201 if self.statistics_enabled != EnabledStatistics::None {
1202 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1203
1204 let statistics = ValueStatistics::<E::T>::new(
1205 self.column_metrics.min_column_value.clone(),
1206 self.column_metrics.max_column_value.clone(),
1207 self.column_metrics.column_distinct_count,
1208 Some(self.column_metrics.num_column_nulls),
1209 false,
1210 )
1211 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1212 .into();
1213
1214 let statistics = self.truncate_statistics(statistics);
1215
1216 builder = builder
1217 .set_statistics(statistics)
1218 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1219 .set_repetition_level_histogram(
1220 self.column_metrics.repetition_level_histogram.take(),
1221 )
1222 .set_definition_level_histogram(
1223 self.column_metrics.definition_level_histogram.take(),
1224 );
1225
1226 if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
1227 builder = builder.set_geo_statistics(geo_stats);
1228 }
1229 }
1230
1231 builder = self.set_column_chunk_encryption_properties(builder);
1232
1233 let metadata = builder.build()?;
1234 Ok(metadata)
1235 }
1236
1237 #[inline]
1239 fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1240 let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1241 encoder.put(levels);
1242 encoder.consume()
1243 }
1244
1245 #[inline]
1248 fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1249 let mut encoder = LevelEncoder::v2(max_level, levels.len());
1250 encoder.put(levels);
1251 encoder.consume()
1252 }
1253
1254 #[inline]
1256 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1257 self.encodings.insert(page.encoding());
1258 match self.encoding_stats.last_mut() {
1259 Some(encoding_stats)
1260 if encoding_stats.page_type == page.page_type()
1261 && encoding_stats.encoding == page.encoding() =>
1262 {
1263 encoding_stats.count += 1;
1264 }
1265 _ => {
1266 self.encoding_stats.push(PageEncodingStats {
1269 page_type: page.page_type(),
1270 encoding: page.encoding(),
1271 count: 1,
1272 });
1273 }
1274 }
1275 let page_spec = self.page_writer.write_page(page)?;
1276 if let Some(builder) = self.offset_index_builder.as_mut() {
1279 builder
1280 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1281 }
1282 self.update_metrics_for_page(page_spec);
1283 Ok(())
1284 }
1285
1286 #[inline]
1288 fn write_dictionary_page(&mut self) -> Result<()> {
1289 let compressed_page = {
1290 let mut page = self
1291 .encoder
1292 .flush_dict_page()?
1293 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1294
1295 let uncompressed_size = page.buf.len();
1296
1297 if let Some(ref mut cmpr) = self.compressor {
1298 let mut output_buf = Vec::with_capacity(uncompressed_size);
1299 cmpr.compress(&page.buf, &mut output_buf)?;
1300 page.buf = Bytes::from(output_buf);
1301 }
1302
1303 let dict_page = Page::DictionaryPage {
1304 buf: page.buf,
1305 num_values: page.num_values as u32,
1306 encoding: self.props.dictionary_page_encoding(),
1307 is_sorted: page.is_sorted,
1308 };
1309 CompressedPage::new(dict_page, uncompressed_size)
1310 };
1311
1312 self.encodings.insert(compressed_page.encoding());
1313 self.encoding_stats.push(PageEncodingStats {
1314 page_type: PageType::DICTIONARY_PAGE,
1315 encoding: compressed_page.encoding(),
1316 count: 1,
1317 });
1318 let page_spec = self.page_writer.write_page(compressed_page)?;
1319 self.update_metrics_for_page(page_spec);
1320 Ok(())
1322 }
1323
1324 #[inline]
1326 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1327 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1328 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1329 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1330
1331 match page_spec.page_type {
1332 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1333 self.column_metrics.total_num_values += page_spec.num_values as u64;
1334 if self.column_metrics.data_page_offset.is_none() {
1335 self.column_metrics.data_page_offset = Some(page_spec.offset);
1336 }
1337 }
1338 PageType::DICTIONARY_PAGE => {
1339 assert!(
1340 self.column_metrics.dictionary_page_offset.is_none(),
1341 "Dictionary offset is already set"
1342 );
1343 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1344 }
1345 _ => {}
1346 }
1347 }
1348
1349 #[inline]
1350 #[cfg(feature = "encryption")]
1351 fn set_column_chunk_encryption_properties(
1352 &self,
1353 builder: ColumnChunkMetaDataBuilder,
1354 ) -> ColumnChunkMetaDataBuilder {
1355 if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1356 builder.set_column_crypto_metadata(get_column_crypto_metadata(
1357 encryption_properties,
1358 &self.descr,
1359 ))
1360 } else {
1361 builder
1362 }
1363 }
1364
1365 #[inline]
1366 #[cfg(not(feature = "encryption"))]
1367 fn set_column_chunk_encryption_properties(
1368 &self,
1369 builder: ColumnChunkMetaDataBuilder,
1370 ) -> ColumnChunkMetaDataBuilder {
1371 builder
1372 }
1373}
1374
1375fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1376 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1377}
1378
1379fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1380 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1381}
1382
1383#[inline]
1384#[allow(clippy::eq_op)]
1385fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1386 match T::PHYSICAL_TYPE {
1387 Type::FLOAT | Type::DOUBLE => val != val,
1388 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1389 let val = val.as_bytes();
1390 let val = f16::from_le_bytes([val[0], val[1]]);
1391 val.is_nan()
1392 }
1393 _ => false,
1394 }
1395}
1396
1397fn update_stat<T: ParquetValueType, F>(
1402 descr: &ColumnDescriptor,
1403 val: &T,
1404 cur: &mut Option<T>,
1405 should_update: F,
1406) where
1407 F: Fn(&T) -> bool,
1408{
1409 if is_nan(descr, val) {
1410 return;
1411 }
1412
1413 if cur.as_ref().is_none_or(should_update) {
1414 *cur = Some(val.clone());
1415 }
1416}
1417
1418fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1420 match T::PHYSICAL_TYPE {
1421 Type::INT32 | Type::INT64 => {
1422 if let Some(LogicalType::Integer {
1423 is_signed: false, ..
1424 }) = descr.logical_type()
1425 {
1426 return compare_greater_unsigned_int(a, b);
1428 }
1429
1430 match descr.converted_type() {
1431 ConvertedType::UINT_8
1432 | ConvertedType::UINT_16
1433 | ConvertedType::UINT_32
1434 | ConvertedType::UINT_64 => {
1435 return compare_greater_unsigned_int(a, b);
1436 }
1437 _ => {}
1438 };
1439 }
1440 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1441 if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1442 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1443 }
1444 if let ConvertedType::DECIMAL = descr.converted_type() {
1445 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1446 }
1447 if let Some(LogicalType::Float16) = descr.logical_type() {
1448 return compare_greater_f16(a.as_bytes(), b.as_bytes());
1449 }
1450 }
1451
1452 _ => {}
1453 }
1454
1455 a > b
1457}
1458
1459fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1467 match (kind, props.writer_version()) {
1468 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1469 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1470 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1471 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1472 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1473 _ => Encoding::PLAIN,
1474 }
1475}
1476
1477fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1479 match (kind, props.writer_version()) {
1480 (Type::BOOLEAN, _) => false,
1482 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1484 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1485 _ => true,
1486 }
1487}
1488
1489#[inline]
1490fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1491 a.as_u64().unwrap() > b.as_u64().unwrap()
1492}
1493
1494#[inline]
1495fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1496 let a = f16::from_le_bytes(a.try_into().unwrap());
1497 let b = f16::from_le_bytes(b.try_into().unwrap());
1498 a > b
1499}
1500
1501fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1503 let a_length = a.len();
1504 let b_length = b.len();
1505
1506 if a_length == 0 || b_length == 0 {
1507 return a_length > 0;
1508 }
1509
1510 let first_a: u8 = a[0];
1511 let first_b: u8 = b[0];
1512
1513 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1518 return (first_a as i8) > (first_b as i8);
1519 }
1520
1521 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1527
1528 if a_length != b_length {
1529 let not_equal = if a_length > b_length {
1530 let lead_length = a_length - b_length;
1531 a[0..lead_length].iter().any(|&x| x != extension)
1532 } else {
1533 let lead_length = b_length - a_length;
1534 b[0..lead_length].iter().any(|&x| x != extension)
1535 };
1536
1537 if not_equal {
1538 let negative_values: bool = (first_a as i8) < 0;
1539 let a_longer: bool = a_length > b_length;
1540 return if negative_values { !a_longer } else { a_longer };
1541 }
1542 }
1543
1544 (a[1..]) > (b[1..])
1545}
1546
1547fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1553 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1554 Some(data.as_bytes()[..split].to_vec())
1555}
1556
1557fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1563 let lower_bound = length.saturating_sub(3);
1565 let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1566 increment_utf8(data.get(..split)?)
1567}
1568
1569fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1576 for (idx, original_char) in data.char_indices().rev() {
1577 let original_len = original_char.len_utf8();
1578 if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1579 if next_char.len_utf8() == original_len {
1581 let mut result = data.as_bytes()[..idx + original_len].to_vec();
1582 next_char.encode_utf8(&mut result[idx..]);
1583 return Some(result);
1584 }
1585 }
1586 }
1587
1588 None
1589}
1590
1591fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1595 for byte in data.iter_mut().rev() {
1596 let (incremented, overflow) = byte.overflowing_add(1);
1597 *byte = incremented;
1598
1599 if !overflow {
1600 return Some(data);
1601 }
1602 }
1603
1604 None
1605}
1606
1607#[cfg(test)]
1608mod tests {
1609 use crate::{
1610 file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1611 schema::parser::parse_message_type,
1612 };
1613 use core::str;
1614 use rand::distr::uniform::SampleUniform;
1615 use std::{fs::File, sync::Arc};
1616
1617 use crate::column::{
1618 page::PageReader,
1619 reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader},
1620 };
1621 use crate::file::writer::TrackedWrite;
1622 use crate::file::{
1623 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1624 };
1625 use crate::schema::types::{ColumnPath, Type as SchemaType};
1626 use crate::util::test_common::rand_gen::random_numbers_range;
1627
1628 use super::*;
1629
1630 #[test]
1631 fn test_column_writer_inconsistent_def_rep_length() {
1632 let page_writer = get_test_page_writer();
1633 let props = Default::default();
1634 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1635 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1636 assert!(res.is_err());
1637 if let Err(err) = res {
1638 assert_eq!(
1639 format!("{err}"),
1640 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1641 );
1642 }
1643 }
1644
1645 #[test]
1646 fn test_column_writer_invalid_def_levels() {
1647 let page_writer = get_test_page_writer();
1648 let props = Default::default();
1649 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1650 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1651 assert!(res.is_err());
1652 if let Err(err) = res {
1653 assert_eq!(
1654 format!("{err}"),
1655 "Parquet error: Definition levels are required, because max definition level = 1"
1656 );
1657 }
1658 }
1659
1660 #[test]
1661 fn test_column_writer_invalid_rep_levels() {
1662 let page_writer = get_test_page_writer();
1663 let props = Default::default();
1664 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1665 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1666 assert!(res.is_err());
1667 if let Err(err) = res {
1668 assert_eq!(
1669 format!("{err}"),
1670 "Parquet error: Repetition levels are required, because max repetition level = 1"
1671 );
1672 }
1673 }
1674
1675 #[test]
1676 fn test_column_writer_not_enough_values_to_write() {
1677 let page_writer = get_test_page_writer();
1678 let props = Default::default();
1679 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1680 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1681 assert!(res.is_err());
1682 if let Err(err) = res {
1683 assert_eq!(
1684 format!("{err}"),
1685 "Parquet error: Expected to write 4 values, but have only 2"
1686 );
1687 }
1688 }
1689
1690 #[test]
1691 fn test_column_writer_write_only_one_dictionary_page() {
1692 let page_writer = get_test_page_writer();
1693 let props = Default::default();
1694 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1695 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1696 writer.add_data_page().unwrap();
1698 writer.write_dictionary_page().unwrap();
1699 let err = writer.write_dictionary_page().unwrap_err().to_string();
1700 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1701 }
1702
1703 #[test]
1704 fn test_column_writer_error_when_writing_disabled_dictionary() {
1705 let page_writer = get_test_page_writer();
1706 let props = Arc::new(
1707 WriterProperties::builder()
1708 .set_dictionary_enabled(false)
1709 .build(),
1710 );
1711 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1712 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1713 let err = writer.write_dictionary_page().unwrap_err().to_string();
1714 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1715 }
1716
1717 #[test]
1718 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1719 let page_writer = get_test_page_writer();
1720 let props = Arc::new(
1721 WriterProperties::builder()
1722 .set_dictionary_enabled(true)
1723 .build(),
1724 );
1725 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1726 writer
1727 .write_batch(&[true, false, true, false], None, None)
1728 .unwrap();
1729
1730 let r = writer.close().unwrap();
1731 assert_eq!(r.bytes_written, 1);
1734 assert_eq!(r.rows_written, 4);
1735
1736 let metadata = r.metadata;
1737 assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1738 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
1740 }
1741
1742 #[test]
1743 fn test_column_writer_default_encoding_support_bool() {
1744 check_encoding_write_support::<BoolType>(
1745 WriterVersion::PARQUET_1_0,
1746 true,
1747 &[true, false],
1748 None,
1749 &[Encoding::PLAIN, Encoding::RLE],
1750 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1751 );
1752 check_encoding_write_support::<BoolType>(
1753 WriterVersion::PARQUET_1_0,
1754 false,
1755 &[true, false],
1756 None,
1757 &[Encoding::PLAIN, Encoding::RLE],
1758 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1759 );
1760 check_encoding_write_support::<BoolType>(
1761 WriterVersion::PARQUET_2_0,
1762 true,
1763 &[true, false],
1764 None,
1765 &[Encoding::RLE],
1766 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1767 );
1768 check_encoding_write_support::<BoolType>(
1769 WriterVersion::PARQUET_2_0,
1770 false,
1771 &[true, false],
1772 None,
1773 &[Encoding::RLE],
1774 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1775 );
1776 }
1777
1778 #[test]
1779 fn test_column_writer_default_encoding_support_int32() {
1780 check_encoding_write_support::<Int32Type>(
1781 WriterVersion::PARQUET_1_0,
1782 true,
1783 &[1, 2],
1784 Some(0),
1785 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1786 &[
1787 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1788 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1789 ],
1790 );
1791 check_encoding_write_support::<Int32Type>(
1792 WriterVersion::PARQUET_1_0,
1793 false,
1794 &[1, 2],
1795 None,
1796 &[Encoding::PLAIN, Encoding::RLE],
1797 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1798 );
1799 check_encoding_write_support::<Int32Type>(
1800 WriterVersion::PARQUET_2_0,
1801 true,
1802 &[1, 2],
1803 Some(0),
1804 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1805 &[
1806 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1807 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1808 ],
1809 );
1810 check_encoding_write_support::<Int32Type>(
1811 WriterVersion::PARQUET_2_0,
1812 false,
1813 &[1, 2],
1814 None,
1815 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1816 &[encoding_stats(
1817 PageType::DATA_PAGE_V2,
1818 Encoding::DELTA_BINARY_PACKED,
1819 1,
1820 )],
1821 );
1822 }
1823
1824 #[test]
1825 fn test_column_writer_default_encoding_support_int64() {
1826 check_encoding_write_support::<Int64Type>(
1827 WriterVersion::PARQUET_1_0,
1828 true,
1829 &[1, 2],
1830 Some(0),
1831 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1832 &[
1833 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1834 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1835 ],
1836 );
1837 check_encoding_write_support::<Int64Type>(
1838 WriterVersion::PARQUET_1_0,
1839 false,
1840 &[1, 2],
1841 None,
1842 &[Encoding::PLAIN, Encoding::RLE],
1843 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1844 );
1845 check_encoding_write_support::<Int64Type>(
1846 WriterVersion::PARQUET_2_0,
1847 true,
1848 &[1, 2],
1849 Some(0),
1850 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1851 &[
1852 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1853 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1854 ],
1855 );
1856 check_encoding_write_support::<Int64Type>(
1857 WriterVersion::PARQUET_2_0,
1858 false,
1859 &[1, 2],
1860 None,
1861 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1862 &[encoding_stats(
1863 PageType::DATA_PAGE_V2,
1864 Encoding::DELTA_BINARY_PACKED,
1865 1,
1866 )],
1867 );
1868 }
1869
1870 #[test]
1871 fn test_column_writer_default_encoding_support_int96() {
1872 check_encoding_write_support::<Int96Type>(
1873 WriterVersion::PARQUET_1_0,
1874 true,
1875 &[Int96::from(vec![1, 2, 3])],
1876 Some(0),
1877 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1878 &[
1879 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1880 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1881 ],
1882 );
1883 check_encoding_write_support::<Int96Type>(
1884 WriterVersion::PARQUET_1_0,
1885 false,
1886 &[Int96::from(vec![1, 2, 3])],
1887 None,
1888 &[Encoding::PLAIN, Encoding::RLE],
1889 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1890 );
1891 check_encoding_write_support::<Int96Type>(
1892 WriterVersion::PARQUET_2_0,
1893 true,
1894 &[Int96::from(vec![1, 2, 3])],
1895 Some(0),
1896 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1897 &[
1898 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1899 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1900 ],
1901 );
1902 check_encoding_write_support::<Int96Type>(
1903 WriterVersion::PARQUET_2_0,
1904 false,
1905 &[Int96::from(vec![1, 2, 3])],
1906 None,
1907 &[Encoding::PLAIN, Encoding::RLE],
1908 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1909 );
1910 }
1911
1912 #[test]
1913 fn test_column_writer_default_encoding_support_float() {
1914 check_encoding_write_support::<FloatType>(
1915 WriterVersion::PARQUET_1_0,
1916 true,
1917 &[1.0, 2.0],
1918 Some(0),
1919 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1920 &[
1921 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1922 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1923 ],
1924 );
1925 check_encoding_write_support::<FloatType>(
1926 WriterVersion::PARQUET_1_0,
1927 false,
1928 &[1.0, 2.0],
1929 None,
1930 &[Encoding::PLAIN, Encoding::RLE],
1931 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1932 );
1933 check_encoding_write_support::<FloatType>(
1934 WriterVersion::PARQUET_2_0,
1935 true,
1936 &[1.0, 2.0],
1937 Some(0),
1938 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1939 &[
1940 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1941 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1942 ],
1943 );
1944 check_encoding_write_support::<FloatType>(
1945 WriterVersion::PARQUET_2_0,
1946 false,
1947 &[1.0, 2.0],
1948 None,
1949 &[Encoding::PLAIN, Encoding::RLE],
1950 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1951 );
1952 }
1953
1954 #[test]
1955 fn test_column_writer_default_encoding_support_double() {
1956 check_encoding_write_support::<DoubleType>(
1957 WriterVersion::PARQUET_1_0,
1958 true,
1959 &[1.0, 2.0],
1960 Some(0),
1961 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1962 &[
1963 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1964 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1965 ],
1966 );
1967 check_encoding_write_support::<DoubleType>(
1968 WriterVersion::PARQUET_1_0,
1969 false,
1970 &[1.0, 2.0],
1971 None,
1972 &[Encoding::PLAIN, Encoding::RLE],
1973 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1974 );
1975 check_encoding_write_support::<DoubleType>(
1976 WriterVersion::PARQUET_2_0,
1977 true,
1978 &[1.0, 2.0],
1979 Some(0),
1980 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1981 &[
1982 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1983 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1984 ],
1985 );
1986 check_encoding_write_support::<DoubleType>(
1987 WriterVersion::PARQUET_2_0,
1988 false,
1989 &[1.0, 2.0],
1990 None,
1991 &[Encoding::PLAIN, Encoding::RLE],
1992 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1993 );
1994 }
1995
1996 #[test]
1997 fn test_column_writer_default_encoding_support_byte_array() {
1998 check_encoding_write_support::<ByteArrayType>(
1999 WriterVersion::PARQUET_1_0,
2000 true,
2001 &[ByteArray::from(vec![1u8])],
2002 Some(0),
2003 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2004 &[
2005 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2006 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2007 ],
2008 );
2009 check_encoding_write_support::<ByteArrayType>(
2010 WriterVersion::PARQUET_1_0,
2011 false,
2012 &[ByteArray::from(vec![1u8])],
2013 None,
2014 &[Encoding::PLAIN, Encoding::RLE],
2015 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2016 );
2017 check_encoding_write_support::<ByteArrayType>(
2018 WriterVersion::PARQUET_2_0,
2019 true,
2020 &[ByteArray::from(vec![1u8])],
2021 Some(0),
2022 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2023 &[
2024 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2025 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2026 ],
2027 );
2028 check_encoding_write_support::<ByteArrayType>(
2029 WriterVersion::PARQUET_2_0,
2030 false,
2031 &[ByteArray::from(vec![1u8])],
2032 None,
2033 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2034 &[encoding_stats(
2035 PageType::DATA_PAGE_V2,
2036 Encoding::DELTA_BYTE_ARRAY,
2037 1,
2038 )],
2039 );
2040 }
2041
2042 #[test]
2043 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2044 check_encoding_write_support::<FixedLenByteArrayType>(
2045 WriterVersion::PARQUET_1_0,
2046 true,
2047 &[ByteArray::from(vec![1u8]).into()],
2048 None,
2049 &[Encoding::PLAIN, Encoding::RLE],
2050 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2051 );
2052 check_encoding_write_support::<FixedLenByteArrayType>(
2053 WriterVersion::PARQUET_1_0,
2054 false,
2055 &[ByteArray::from(vec![1u8]).into()],
2056 None,
2057 &[Encoding::PLAIN, Encoding::RLE],
2058 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2059 );
2060 check_encoding_write_support::<FixedLenByteArrayType>(
2061 WriterVersion::PARQUET_2_0,
2062 true,
2063 &[ByteArray::from(vec![1u8]).into()],
2064 Some(0),
2065 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2066 &[
2067 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2068 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2069 ],
2070 );
2071 check_encoding_write_support::<FixedLenByteArrayType>(
2072 WriterVersion::PARQUET_2_0,
2073 false,
2074 &[ByteArray::from(vec![1u8]).into()],
2075 None,
2076 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2077 &[encoding_stats(
2078 PageType::DATA_PAGE_V2,
2079 Encoding::DELTA_BYTE_ARRAY,
2080 1,
2081 )],
2082 );
2083 }
2084
2085 #[test]
2086 fn test_column_writer_check_metadata() {
2087 let page_writer = get_test_page_writer();
2088 let props = Default::default();
2089 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2090 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2091
2092 let r = writer.close().unwrap();
2093 assert_eq!(r.bytes_written, 20);
2094 assert_eq!(r.rows_written, 4);
2095
2096 let metadata = r.metadata;
2097 assert_eq!(
2098 metadata.encodings(),
2099 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2100 );
2101 assert_eq!(metadata.num_values(), 4);
2102 assert_eq!(metadata.compressed_size(), 20);
2103 assert_eq!(metadata.uncompressed_size(), 20);
2104 assert_eq!(metadata.data_page_offset(), 0);
2105 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2106 if let Some(stats) = metadata.statistics() {
2107 assert_eq!(stats.null_count_opt(), Some(0));
2108 assert_eq!(stats.distinct_count_opt(), None);
2109 if let Statistics::Int32(stats) = stats {
2110 assert_eq!(stats.min_opt().unwrap(), &1);
2111 assert_eq!(stats.max_opt().unwrap(), &4);
2112 } else {
2113 panic!("expecting Statistics::Int32");
2114 }
2115 } else {
2116 panic!("metadata missing statistics");
2117 }
2118 }
2119
2120 #[test]
2121 fn test_column_writer_check_byte_array_min_max() {
2122 let page_writer = get_test_page_writer();
2123 let props = Default::default();
2124 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2125 writer
2126 .write_batch(
2127 &[
2128 ByteArray::from(vec![
2129 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2130 35u8, 231u8, 90u8, 0u8, 0u8,
2131 ]),
2132 ByteArray::from(vec![
2133 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2134 152u8, 177u8, 56u8, 0u8, 0u8,
2135 ]),
2136 ByteArray::from(vec![
2137 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2138 0u8,
2139 ]),
2140 ByteArray::from(vec![
2141 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2142 44u8, 0u8, 0u8,
2143 ]),
2144 ],
2145 None,
2146 None,
2147 )
2148 .unwrap();
2149 let metadata = writer.close().unwrap().metadata;
2150 if let Some(stats) = metadata.statistics() {
2151 if let Statistics::ByteArray(stats) = stats {
2152 assert_eq!(
2153 stats.min_opt().unwrap(),
2154 &ByteArray::from(vec![
2155 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2156 35u8, 231u8, 90u8, 0u8, 0u8,
2157 ])
2158 );
2159 assert_eq!(
2160 stats.max_opt().unwrap(),
2161 &ByteArray::from(vec![
2162 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2163 44u8, 0u8, 0u8,
2164 ])
2165 );
2166 } else {
2167 panic!("expecting Statistics::ByteArray");
2168 }
2169 } else {
2170 panic!("metadata missing statistics");
2171 }
2172 }
2173
2174 #[test]
2175 fn test_column_writer_uint32_converted_type_min_max() {
2176 let page_writer = get_test_page_writer();
2177 let props = Default::default();
2178 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2179 page_writer,
2180 0,
2181 0,
2182 props,
2183 );
2184 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2185 let metadata = writer.close().unwrap().metadata;
2186 if let Some(stats) = metadata.statistics() {
2187 if let Statistics::Int32(stats) = stats {
2188 assert_eq!(stats.min_opt().unwrap(), &0,);
2189 assert_eq!(stats.max_opt().unwrap(), &5,);
2190 } else {
2191 panic!("expecting Statistics::Int32");
2192 }
2193 } else {
2194 panic!("metadata missing statistics");
2195 }
2196 }
2197
2198 #[test]
2199 fn test_column_writer_precalculated_statistics() {
2200 let page_writer = get_test_page_writer();
2201 let props = Arc::new(
2202 WriterProperties::builder()
2203 .set_statistics_enabled(EnabledStatistics::Chunk)
2204 .build(),
2205 );
2206 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2207 writer
2208 .write_batch_with_statistics(
2209 &[1, 2, 3, 4],
2210 None,
2211 None,
2212 Some(&-17),
2213 Some(&9000),
2214 Some(55),
2215 )
2216 .unwrap();
2217
2218 let r = writer.close().unwrap();
2219 assert_eq!(r.bytes_written, 20);
2220 assert_eq!(r.rows_written, 4);
2221
2222 let metadata = r.metadata;
2223 assert_eq!(
2224 metadata.encodings(),
2225 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2226 );
2227 assert_eq!(metadata.num_values(), 4);
2228 assert_eq!(metadata.compressed_size(), 20);
2229 assert_eq!(metadata.uncompressed_size(), 20);
2230 assert_eq!(metadata.data_page_offset(), 0);
2231 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2232 if let Some(stats) = metadata.statistics() {
2233 assert_eq!(stats.null_count_opt(), Some(0));
2234 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2235 if let Statistics::Int32(stats) = stats {
2236 assert_eq!(stats.min_opt().unwrap(), &-17);
2237 assert_eq!(stats.max_opt().unwrap(), &9000);
2238 } else {
2239 panic!("expecting Statistics::Int32");
2240 }
2241 } else {
2242 panic!("metadata missing statistics");
2243 }
2244 }
2245
2246 #[test]
2247 fn test_mixed_precomputed_statistics() {
2248 let mut buf = Vec::with_capacity(100);
2249 let mut write = TrackedWrite::new(&mut buf);
2250 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2251 let props = Arc::new(
2252 WriterProperties::builder()
2253 .set_write_page_header_statistics(true)
2254 .build(),
2255 );
2256 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2257
2258 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2259 writer
2260 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2261 .unwrap();
2262
2263 let r = writer.close().unwrap();
2264
2265 let stats = r.metadata.statistics().unwrap();
2266 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2267 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2268 assert_eq!(stats.null_count_opt(), Some(0));
2269 assert!(stats.distinct_count_opt().is_none());
2270
2271 drop(write);
2272
2273 let props = ReaderProperties::builder()
2274 .set_backward_compatible_lz4(false)
2275 .set_read_page_statistics(true)
2276 .build();
2277 let reader = SerializedPageReader::new_with_properties(
2278 Arc::new(Bytes::from(buf)),
2279 &r.metadata,
2280 r.rows_written as usize,
2281 None,
2282 Arc::new(props),
2283 )
2284 .unwrap();
2285
2286 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2287 assert_eq!(pages.len(), 2);
2288
2289 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2290 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2291
2292 let page_statistics = pages[1].statistics().unwrap();
2293 assert_eq!(
2294 page_statistics.min_bytes_opt().unwrap(),
2295 1_i32.to_le_bytes()
2296 );
2297 assert_eq!(
2298 page_statistics.max_bytes_opt().unwrap(),
2299 7_i32.to_le_bytes()
2300 );
2301 assert_eq!(page_statistics.null_count_opt(), Some(0));
2302 assert!(page_statistics.distinct_count_opt().is_none());
2303 }
2304
2305 #[test]
2306 fn test_disabled_statistics() {
2307 let mut buf = Vec::with_capacity(100);
2308 let mut write = TrackedWrite::new(&mut buf);
2309 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2310 let props = WriterProperties::builder()
2311 .set_statistics_enabled(EnabledStatistics::None)
2312 .set_writer_version(WriterVersion::PARQUET_2_0)
2313 .build();
2314 let props = Arc::new(props);
2315
2316 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2317 writer
2318 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2319 .unwrap();
2320
2321 let r = writer.close().unwrap();
2322 assert!(r.metadata.statistics().is_none());
2323
2324 drop(write);
2325
2326 let props = ReaderProperties::builder()
2327 .set_backward_compatible_lz4(false)
2328 .build();
2329 let reader = SerializedPageReader::new_with_properties(
2330 Arc::new(Bytes::from(buf)),
2331 &r.metadata,
2332 r.rows_written as usize,
2333 None,
2334 Arc::new(props),
2335 )
2336 .unwrap();
2337
2338 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2339 assert_eq!(pages.len(), 2);
2340
2341 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2342 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2343
2344 match &pages[1] {
2345 Page::DataPageV2 {
2346 num_values,
2347 num_nulls,
2348 num_rows,
2349 statistics,
2350 ..
2351 } => {
2352 assert_eq!(*num_values, 6);
2353 assert_eq!(*num_nulls, 2);
2354 assert_eq!(*num_rows, 6);
2355 assert!(statistics.is_none());
2356 }
2357 _ => unreachable!(),
2358 }
2359 }
2360
2361 #[test]
2362 fn test_column_writer_empty_column_roundtrip() {
2363 let props = Default::default();
2364 column_roundtrip::<Int32Type>(props, &[], None, None);
2365 }
2366
2367 #[test]
2368 fn test_column_writer_non_nullable_values_roundtrip() {
2369 let props = Default::default();
2370 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2371 }
2372
2373 #[test]
2374 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2375 let props = Default::default();
2376 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2377 }
2378
2379 #[test]
2380 fn test_column_writer_nullable_repeated_values_roundtrip() {
2381 let props = Default::default();
2382 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2383 }
2384
2385 #[test]
2386 fn test_column_writer_dictionary_fallback_small_data_page() {
2387 let props = WriterProperties::builder()
2388 .set_dictionary_page_size_limit(32)
2389 .set_data_page_size_limit(32)
2390 .build();
2391 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2392 }
2393
2394 #[test]
2395 fn test_column_writer_small_write_batch_size() {
2396 for i in &[1usize, 2, 5, 10, 11, 1023] {
2397 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2398
2399 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2400 }
2401 }
2402
2403 #[test]
2404 fn test_column_writer_dictionary_disabled_v1() {
2405 let props = WriterProperties::builder()
2406 .set_writer_version(WriterVersion::PARQUET_1_0)
2407 .set_dictionary_enabled(false)
2408 .build();
2409 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2410 }
2411
2412 #[test]
2413 fn test_column_writer_dictionary_disabled_v2() {
2414 let props = WriterProperties::builder()
2415 .set_writer_version(WriterVersion::PARQUET_2_0)
2416 .set_dictionary_enabled(false)
2417 .build();
2418 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2419 }
2420
2421 #[test]
2422 fn test_column_writer_compression_v1() {
2423 let props = WriterProperties::builder()
2424 .set_writer_version(WriterVersion::PARQUET_1_0)
2425 .set_compression(Compression::SNAPPY)
2426 .build();
2427 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2428 }
2429
2430 #[test]
2431 fn test_column_writer_compression_v2() {
2432 let props = WriterProperties::builder()
2433 .set_writer_version(WriterVersion::PARQUET_2_0)
2434 .set_compression(Compression::SNAPPY)
2435 .build();
2436 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2437 }
2438
2439 #[test]
2440 fn test_column_writer_add_data_pages_with_dict() {
2441 let mut file = tempfile::tempfile().unwrap();
2444 let mut write = TrackedWrite::new(&mut file);
2445 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2446 let props = Arc::new(
2447 WriterProperties::builder()
2448 .set_data_page_size_limit(10)
2449 .set_write_batch_size(3) .build(),
2451 );
2452 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2453 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2454 writer.write_batch(data, None, None).unwrap();
2455 let r = writer.close().unwrap();
2456
2457 drop(write);
2458
2459 let props = ReaderProperties::builder()
2461 .set_backward_compatible_lz4(false)
2462 .build();
2463 let mut page_reader = Box::new(
2464 SerializedPageReader::new_with_properties(
2465 Arc::new(file),
2466 &r.metadata,
2467 r.rows_written as usize,
2468 None,
2469 Arc::new(props),
2470 )
2471 .unwrap(),
2472 );
2473 let mut res = Vec::new();
2474 while let Some(page) = page_reader.get_next_page().unwrap() {
2475 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2476 }
2477 assert_eq!(
2478 res,
2479 vec![
2480 (PageType::DICTIONARY_PAGE, 10, 40),
2481 (PageType::DATA_PAGE, 9, 10),
2482 (PageType::DATA_PAGE, 1, 3),
2483 ]
2484 );
2485 assert_eq!(
2486 r.metadata.page_encoding_stats(),
2487 Some(&vec![
2488 PageEncodingStats {
2489 page_type: PageType::DICTIONARY_PAGE,
2490 encoding: Encoding::PLAIN,
2491 count: 1
2492 },
2493 PageEncodingStats {
2494 page_type: PageType::DATA_PAGE,
2495 encoding: Encoding::RLE_DICTIONARY,
2496 count: 2,
2497 }
2498 ])
2499 );
2500 }
2501
2502 #[test]
2503 fn test_bool_statistics() {
2504 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2505 assert!(!stats.is_min_max_backwards_compatible());
2508 if let Statistics::Boolean(stats) = stats {
2509 assert_eq!(stats.min_opt().unwrap(), &false);
2510 assert_eq!(stats.max_opt().unwrap(), &true);
2511 } else {
2512 panic!("expecting Statistics::Boolean, got {stats:?}");
2513 }
2514 }
2515
2516 #[test]
2517 fn test_int32_statistics() {
2518 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2519 assert!(stats.is_min_max_backwards_compatible());
2520 if let Statistics::Int32(stats) = stats {
2521 assert_eq!(stats.min_opt().unwrap(), &-2);
2522 assert_eq!(stats.max_opt().unwrap(), &3);
2523 } else {
2524 panic!("expecting Statistics::Int32, got {stats:?}");
2525 }
2526 }
2527
2528 #[test]
2529 fn test_int64_statistics() {
2530 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2531 assert!(stats.is_min_max_backwards_compatible());
2532 if let Statistics::Int64(stats) = stats {
2533 assert_eq!(stats.min_opt().unwrap(), &-2);
2534 assert_eq!(stats.max_opt().unwrap(), &3);
2535 } else {
2536 panic!("expecting Statistics::Int64, got {stats:?}");
2537 }
2538 }
2539
2540 #[test]
2541 fn test_int96_statistics() {
2542 let input = vec![
2543 Int96::from(vec![1, 20, 30]),
2544 Int96::from(vec![3, 20, 10]),
2545 Int96::from(vec![0, 20, 30]),
2546 Int96::from(vec![2, 20, 30]),
2547 ]
2548 .into_iter()
2549 .collect::<Vec<Int96>>();
2550
2551 let stats = statistics_roundtrip::<Int96Type>(&input);
2552 assert!(!stats.is_min_max_backwards_compatible());
2553 if let Statistics::Int96(stats) = stats {
2554 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2555 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
2556 } else {
2557 panic!("expecting Statistics::Int96, got {stats:?}");
2558 }
2559 }
2560
2561 #[test]
2562 fn test_float_statistics() {
2563 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2564 assert!(stats.is_min_max_backwards_compatible());
2565 if let Statistics::Float(stats) = stats {
2566 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2567 assert_eq!(stats.max_opt().unwrap(), &3.0);
2568 } else {
2569 panic!("expecting Statistics::Float, got {stats:?}");
2570 }
2571 }
2572
2573 #[test]
2574 fn test_double_statistics() {
2575 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2576 assert!(stats.is_min_max_backwards_compatible());
2577 if let Statistics::Double(stats) = stats {
2578 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2579 assert_eq!(stats.max_opt().unwrap(), &3.0);
2580 } else {
2581 panic!("expecting Statistics::Double, got {stats:?}");
2582 }
2583 }
2584
2585 #[test]
2586 fn test_byte_array_statistics() {
2587 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2588 .iter()
2589 .map(|&s| s.into())
2590 .collect::<Vec<_>>();
2591
2592 let stats = statistics_roundtrip::<ByteArrayType>(&input);
2593 assert!(!stats.is_min_max_backwards_compatible());
2594 if let Statistics::ByteArray(stats) = stats {
2595 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2596 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2597 } else {
2598 panic!("expecting Statistics::ByteArray, got {stats:?}");
2599 }
2600 }
2601
2602 #[test]
2603 fn test_fixed_len_byte_array_statistics() {
2604 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
2605 .iter()
2606 .map(|&s| ByteArray::from(s).into())
2607 .collect::<Vec<_>>();
2608
2609 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2610 assert!(!stats.is_min_max_backwards_compatible());
2611 if let Statistics::FixedLenByteArray(stats) = stats {
2612 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
2613 assert_eq!(stats.min_opt().unwrap(), &expected_min);
2614 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
2615 assert_eq!(stats.max_opt().unwrap(), &expected_max);
2616 } else {
2617 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2618 }
2619 }
2620
2621 #[test]
2622 fn test_column_writer_check_float16_min_max() {
2623 let input = [
2624 -f16::ONE,
2625 f16::from_f32(3.0),
2626 -f16::from_f32(2.0),
2627 f16::from_f32(2.0),
2628 ]
2629 .into_iter()
2630 .map(|s| ByteArray::from(s).into())
2631 .collect::<Vec<_>>();
2632
2633 let stats = float16_statistics_roundtrip(&input);
2634 assert!(stats.is_min_max_backwards_compatible());
2635 assert_eq!(
2636 stats.min_opt().unwrap(),
2637 &ByteArray::from(-f16::from_f32(2.0))
2638 );
2639 assert_eq!(
2640 stats.max_opt().unwrap(),
2641 &ByteArray::from(f16::from_f32(3.0))
2642 );
2643 }
2644
2645 #[test]
2646 fn test_column_writer_check_float16_nan_middle() {
2647 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2648 .into_iter()
2649 .map(|s| ByteArray::from(s).into())
2650 .collect::<Vec<_>>();
2651
2652 let stats = float16_statistics_roundtrip(&input);
2653 assert!(stats.is_min_max_backwards_compatible());
2654 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2655 assert_eq!(
2656 stats.max_opt().unwrap(),
2657 &ByteArray::from(f16::ONE + f16::ONE)
2658 );
2659 }
2660
2661 #[test]
2662 fn test_float16_statistics_nan_middle() {
2663 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2664 .into_iter()
2665 .map(|s| ByteArray::from(s).into())
2666 .collect::<Vec<_>>();
2667
2668 let stats = float16_statistics_roundtrip(&input);
2669 assert!(stats.is_min_max_backwards_compatible());
2670 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2671 assert_eq!(
2672 stats.max_opt().unwrap(),
2673 &ByteArray::from(f16::ONE + f16::ONE)
2674 );
2675 }
2676
2677 #[test]
2678 fn test_float16_statistics_nan_start() {
2679 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2680 .into_iter()
2681 .map(|s| ByteArray::from(s).into())
2682 .collect::<Vec<_>>();
2683
2684 let stats = float16_statistics_roundtrip(&input);
2685 assert!(stats.is_min_max_backwards_compatible());
2686 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2687 assert_eq!(
2688 stats.max_opt().unwrap(),
2689 &ByteArray::from(f16::ONE + f16::ONE)
2690 );
2691 }
2692
2693 #[test]
2694 fn test_float16_statistics_nan_only() {
2695 let input = [f16::NAN, f16::NAN]
2696 .into_iter()
2697 .map(|s| ByteArray::from(s).into())
2698 .collect::<Vec<_>>();
2699
2700 let stats = float16_statistics_roundtrip(&input);
2701 assert!(stats.min_bytes_opt().is_none());
2702 assert!(stats.max_bytes_opt().is_none());
2703 assert!(stats.is_min_max_backwards_compatible());
2704 }
2705
2706 #[test]
2707 fn test_float16_statistics_zero_only() {
2708 let input = [f16::ZERO]
2709 .into_iter()
2710 .map(|s| ByteArray::from(s).into())
2711 .collect::<Vec<_>>();
2712
2713 let stats = float16_statistics_roundtrip(&input);
2714 assert!(stats.is_min_max_backwards_compatible());
2715 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2716 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2717 }
2718
2719 #[test]
2720 fn test_float16_statistics_neg_zero_only() {
2721 let input = [f16::NEG_ZERO]
2722 .into_iter()
2723 .map(|s| ByteArray::from(s).into())
2724 .collect::<Vec<_>>();
2725
2726 let stats = float16_statistics_roundtrip(&input);
2727 assert!(stats.is_min_max_backwards_compatible());
2728 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2729 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2730 }
2731
2732 #[test]
2733 fn test_float16_statistics_zero_min() {
2734 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2735 .into_iter()
2736 .map(|s| ByteArray::from(s).into())
2737 .collect::<Vec<_>>();
2738
2739 let stats = float16_statistics_roundtrip(&input);
2740 assert!(stats.is_min_max_backwards_compatible());
2741 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2742 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2743 }
2744
2745 #[test]
2746 fn test_float16_statistics_neg_zero_max() {
2747 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2748 .into_iter()
2749 .map(|s| ByteArray::from(s).into())
2750 .collect::<Vec<_>>();
2751
2752 let stats = float16_statistics_roundtrip(&input);
2753 assert!(stats.is_min_max_backwards_compatible());
2754 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2755 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2756 }
2757
2758 #[test]
2759 fn test_float_statistics_nan_middle() {
2760 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2761 assert!(stats.is_min_max_backwards_compatible());
2762 if let Statistics::Float(stats) = stats {
2763 assert_eq!(stats.min_opt().unwrap(), &1.0);
2764 assert_eq!(stats.max_opt().unwrap(), &2.0);
2765 } else {
2766 panic!("expecting Statistics::Float");
2767 }
2768 }
2769
2770 #[test]
2771 fn test_float_statistics_nan_start() {
2772 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2773 assert!(stats.is_min_max_backwards_compatible());
2774 if let Statistics::Float(stats) = stats {
2775 assert_eq!(stats.min_opt().unwrap(), &1.0);
2776 assert_eq!(stats.max_opt().unwrap(), &2.0);
2777 } else {
2778 panic!("expecting Statistics::Float");
2779 }
2780 }
2781
2782 #[test]
2783 fn test_float_statistics_nan_only() {
2784 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2785 assert!(stats.min_bytes_opt().is_none());
2786 assert!(stats.max_bytes_opt().is_none());
2787 assert!(stats.is_min_max_backwards_compatible());
2788 assert!(matches!(stats, Statistics::Float(_)));
2789 }
2790
2791 #[test]
2792 fn test_float_statistics_zero_only() {
2793 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2794 assert!(stats.is_min_max_backwards_compatible());
2795 if let Statistics::Float(stats) = stats {
2796 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2797 assert!(stats.min_opt().unwrap().is_sign_negative());
2798 assert_eq!(stats.max_opt().unwrap(), &0.0);
2799 assert!(stats.max_opt().unwrap().is_sign_positive());
2800 } else {
2801 panic!("expecting Statistics::Float");
2802 }
2803 }
2804
2805 #[test]
2806 fn test_float_statistics_neg_zero_only() {
2807 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2808 assert!(stats.is_min_max_backwards_compatible());
2809 if let Statistics::Float(stats) = stats {
2810 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2811 assert!(stats.min_opt().unwrap().is_sign_negative());
2812 assert_eq!(stats.max_opt().unwrap(), &0.0);
2813 assert!(stats.max_opt().unwrap().is_sign_positive());
2814 } else {
2815 panic!("expecting Statistics::Float");
2816 }
2817 }
2818
2819 #[test]
2820 fn test_float_statistics_zero_min() {
2821 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2822 assert!(stats.is_min_max_backwards_compatible());
2823 if let Statistics::Float(stats) = stats {
2824 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2825 assert!(stats.min_opt().unwrap().is_sign_negative());
2826 assert_eq!(stats.max_opt().unwrap(), &2.0);
2827 } else {
2828 panic!("expecting Statistics::Float");
2829 }
2830 }
2831
2832 #[test]
2833 fn test_float_statistics_neg_zero_max() {
2834 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2835 assert!(stats.is_min_max_backwards_compatible());
2836 if let Statistics::Float(stats) = stats {
2837 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2838 assert_eq!(stats.max_opt().unwrap(), &0.0);
2839 assert!(stats.max_opt().unwrap().is_sign_positive());
2840 } else {
2841 panic!("expecting Statistics::Float");
2842 }
2843 }
2844
2845 #[test]
2846 fn test_double_statistics_nan_middle() {
2847 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2848 assert!(stats.is_min_max_backwards_compatible());
2849 if let Statistics::Double(stats) = stats {
2850 assert_eq!(stats.min_opt().unwrap(), &1.0);
2851 assert_eq!(stats.max_opt().unwrap(), &2.0);
2852 } else {
2853 panic!("expecting Statistics::Double");
2854 }
2855 }
2856
2857 #[test]
2858 fn test_double_statistics_nan_start() {
2859 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2860 assert!(stats.is_min_max_backwards_compatible());
2861 if let Statistics::Double(stats) = stats {
2862 assert_eq!(stats.min_opt().unwrap(), &1.0);
2863 assert_eq!(stats.max_opt().unwrap(), &2.0);
2864 } else {
2865 panic!("expecting Statistics::Double");
2866 }
2867 }
2868
2869 #[test]
2870 fn test_double_statistics_nan_only() {
2871 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2872 assert!(stats.min_bytes_opt().is_none());
2873 assert!(stats.max_bytes_opt().is_none());
2874 assert!(matches!(stats, Statistics::Double(_)));
2875 assert!(stats.is_min_max_backwards_compatible());
2876 }
2877
2878 #[test]
2879 fn test_double_statistics_zero_only() {
2880 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2881 assert!(stats.is_min_max_backwards_compatible());
2882 if let Statistics::Double(stats) = stats {
2883 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2884 assert!(stats.min_opt().unwrap().is_sign_negative());
2885 assert_eq!(stats.max_opt().unwrap(), &0.0);
2886 assert!(stats.max_opt().unwrap().is_sign_positive());
2887 } else {
2888 panic!("expecting Statistics::Double");
2889 }
2890 }
2891
2892 #[test]
2893 fn test_double_statistics_neg_zero_only() {
2894 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2895 assert!(stats.is_min_max_backwards_compatible());
2896 if let Statistics::Double(stats) = stats {
2897 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2898 assert!(stats.min_opt().unwrap().is_sign_negative());
2899 assert_eq!(stats.max_opt().unwrap(), &0.0);
2900 assert!(stats.max_opt().unwrap().is_sign_positive());
2901 } else {
2902 panic!("expecting Statistics::Double");
2903 }
2904 }
2905
2906 #[test]
2907 fn test_double_statistics_zero_min() {
2908 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2909 assert!(stats.is_min_max_backwards_compatible());
2910 if let Statistics::Double(stats) = stats {
2911 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2912 assert!(stats.min_opt().unwrap().is_sign_negative());
2913 assert_eq!(stats.max_opt().unwrap(), &2.0);
2914 } else {
2915 panic!("expecting Statistics::Double");
2916 }
2917 }
2918
2919 #[test]
2920 fn test_double_statistics_neg_zero_max() {
2921 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2922 assert!(stats.is_min_max_backwards_compatible());
2923 if let Statistics::Double(stats) = stats {
2924 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2925 assert_eq!(stats.max_opt().unwrap(), &0.0);
2926 assert!(stats.max_opt().unwrap().is_sign_positive());
2927 } else {
2928 panic!("expecting Statistics::Double");
2929 }
2930 }
2931
2932 #[test]
2933 fn test_compare_greater_byte_array_decimals() {
2934 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2935 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2936 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2937 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2938 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2939 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2940 assert!(!compare_greater_byte_array_decimals(
2941 &[0u8, 1u8,],
2942 &[1u8, 0u8,],
2943 ),);
2944 assert!(!compare_greater_byte_array_decimals(
2945 &[255u8, 35u8, 0u8, 0u8,],
2946 &[0u8,],
2947 ),);
2948 assert!(compare_greater_byte_array_decimals(
2949 &[0u8,],
2950 &[255u8, 35u8, 0u8, 0u8,],
2951 ),);
2952 }
2953
2954 #[test]
2955 fn test_column_index_with_null_pages() {
2956 let page_writer = get_test_page_writer();
2958 let props = Default::default();
2959 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2960 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2961
2962 let r = writer.close().unwrap();
2963 assert!(r.column_index.is_some());
2964 let col_idx = r.column_index.unwrap();
2965 let col_idx = match col_idx {
2966 ColumnIndexMetaData::INT32(col_idx) => col_idx,
2967 _ => panic!("wrong stats type"),
2968 };
2969 assert!(col_idx.is_null_page(0));
2971 assert!(col_idx.min_value(0).is_none());
2973 assert!(col_idx.max_value(0).is_none());
2974 assert!(col_idx.null_count(0).is_some());
2976 assert_eq!(col_idx.null_count(0), Some(4));
2977 assert!(col_idx.repetition_level_histogram(0).is_none());
2979 assert!(col_idx.definition_level_histogram(0).is_some());
2981 assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
2982 }
2983
2984 #[test]
2985 fn test_column_offset_index_metadata() {
2986 let page_writer = get_test_page_writer();
2989 let props = Default::default();
2990 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2991 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2992 writer.flush_data_pages().unwrap();
2994 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2996
2997 let r = writer.close().unwrap();
2998 let column_index = r.column_index.unwrap();
2999 let offset_index = r.offset_index.unwrap();
3000
3001 assert_eq!(8, r.rows_written);
3002
3003 let column_index = match column_index {
3005 ColumnIndexMetaData::INT32(column_index) => column_index,
3006 _ => panic!("wrong stats type"),
3007 };
3008 assert_eq!(2, column_index.num_pages());
3009 assert_eq!(2, offset_index.page_locations.len());
3010 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
3011 for idx in 0..2 {
3012 assert!(!column_index.is_null_page(idx));
3013 assert_eq!(0, column_index.null_count(0).unwrap());
3014 }
3015
3016 if let Some(stats) = r.metadata.statistics() {
3017 assert_eq!(stats.null_count_opt(), Some(0));
3018 assert_eq!(stats.distinct_count_opt(), None);
3019 if let Statistics::Int32(stats) = stats {
3020 assert_eq!(stats.min_opt(), column_index.min_value(1));
3024 assert_eq!(stats.max_opt(), column_index.max_value(1));
3025 } else {
3026 panic!("expecting Statistics::Int32");
3027 }
3028 } else {
3029 panic!("metadata missing statistics");
3030 }
3031
3032 assert_eq!(0, offset_index.page_locations[0].first_row_index);
3034 assert_eq!(4, offset_index.page_locations[1].first_row_index);
3035 }
3036
3037 #[test]
3039 fn test_column_offset_index_metadata_truncating() {
3040 let page_writer = get_test_page_writer();
3043 let props = WriterProperties::builder()
3044 .set_statistics_truncate_length(None) .build()
3046 .into();
3047 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3048
3049 let mut data = vec![FixedLenByteArray::default(); 3];
3050 data[0].set_data(Bytes::from(vec![97_u8; 200]));
3052 data[1].set_data(Bytes::from(vec![112_u8; 200]));
3054 data[2].set_data(Bytes::from(vec![98_u8; 200]));
3055
3056 writer.write_batch(&data, None, None).unwrap();
3057
3058 writer.flush_data_pages().unwrap();
3059
3060 let r = writer.close().unwrap();
3061 let column_index = r.column_index.unwrap();
3062 let offset_index = r.offset_index.unwrap();
3063
3064 let column_index = match column_index {
3065 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3066 _ => panic!("wrong stats type"),
3067 };
3068
3069 assert_eq!(3, r.rows_written);
3070
3071 assert_eq!(1, column_index.num_pages());
3073 assert_eq!(1, offset_index.page_locations.len());
3074 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3075 assert!(!column_index.is_null_page(0));
3076 assert_eq!(Some(0), column_index.null_count(0));
3077
3078 if let Some(stats) = r.metadata.statistics() {
3079 assert_eq!(stats.null_count_opt(), Some(0));
3080 assert_eq!(stats.distinct_count_opt(), None);
3081 if let Statistics::FixedLenByteArray(stats) = stats {
3082 let column_index_min_value = column_index.min_value(0).unwrap();
3083 let column_index_max_value = column_index.max_value(0).unwrap();
3084
3085 assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value);
3087 assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value);
3088
3089 assert_eq!(
3090 column_index_min_value.len(),
3091 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3092 );
3093 assert_eq!(column_index_min_value, &[97_u8; 64]);
3094 assert_eq!(
3095 column_index_max_value.len(),
3096 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3097 );
3098
3099 assert_eq!(
3101 *column_index_max_value.last().unwrap(),
3102 *column_index_max_value.first().unwrap() + 1
3103 );
3104 } else {
3105 panic!("expecting Statistics::FixedLenByteArray");
3106 }
3107 } else {
3108 panic!("metadata missing statistics");
3109 }
3110 }
3111
3112 #[test]
3113 fn test_column_offset_index_truncating_spec_example() {
3114 let page_writer = get_test_page_writer();
3117
3118 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3120 let props = Arc::new(builder.build());
3121 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3122
3123 let mut data = vec![FixedLenByteArray::default(); 1];
3124 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3126
3127 writer.write_batch(&data, None, None).unwrap();
3128
3129 writer.flush_data_pages().unwrap();
3130
3131 let r = writer.close().unwrap();
3132 let column_index = r.column_index.unwrap();
3133 let offset_index = r.offset_index.unwrap();
3134
3135 let column_index = match column_index {
3136 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3137 _ => panic!("wrong stats type"),
3138 };
3139
3140 assert_eq!(1, r.rows_written);
3141
3142 assert_eq!(1, column_index.num_pages());
3144 assert_eq!(1, offset_index.page_locations.len());
3145 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3146 assert!(!column_index.is_null_page(0));
3147 assert_eq!(Some(0), column_index.null_count(0));
3148
3149 if let Some(stats) = r.metadata.statistics() {
3150 assert_eq!(stats.null_count_opt(), Some(0));
3151 assert_eq!(stats.distinct_count_opt(), None);
3152 if let Statistics::FixedLenByteArray(_stats) = stats {
3153 let column_index_min_value = column_index.min_value(0).unwrap();
3154 let column_index_max_value = column_index.max_value(0).unwrap();
3155
3156 assert_eq!(column_index_min_value.len(), 1);
3157 assert_eq!(column_index_max_value.len(), 1);
3158
3159 assert_eq!("B".as_bytes(), column_index_min_value);
3160 assert_eq!("C".as_bytes(), column_index_max_value);
3161
3162 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3163 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3164 } else {
3165 panic!("expecting Statistics::FixedLenByteArray");
3166 }
3167 } else {
3168 panic!("metadata missing statistics");
3169 }
3170 }
3171
3172 #[test]
3173 fn test_float16_min_max_no_truncation() {
3174 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3176 let props = Arc::new(builder.build());
3177 let page_writer = get_test_page_writer();
3178 let mut writer = get_test_float16_column_writer(page_writer, props);
3179
3180 let expected_value = f16::PI.to_le_bytes().to_vec();
3181 let data = vec![ByteArray::from(expected_value.clone()).into()];
3182 writer.write_batch(&data, None, None).unwrap();
3183 writer.flush_data_pages().unwrap();
3184
3185 let r = writer.close().unwrap();
3186
3187 let column_index = r.column_index.unwrap();
3190 let column_index = match column_index {
3191 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3192 _ => panic!("wrong stats type"),
3193 };
3194 let column_index_min_bytes = column_index.min_value(0).unwrap();
3195 let column_index_max_bytes = column_index.max_value(0).unwrap();
3196 assert_eq!(expected_value, column_index_min_bytes);
3197 assert_eq!(expected_value, column_index_max_bytes);
3198
3199 let stats = r.metadata.statistics().unwrap();
3201 if let Statistics::FixedLenByteArray(stats) = stats {
3202 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3203 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3204 assert_eq!(expected_value, stats_min_bytes);
3205 assert_eq!(expected_value, stats_max_bytes);
3206 } else {
3207 panic!("expecting Statistics::FixedLenByteArray");
3208 }
3209 }
3210
3211 #[test]
3212 fn test_decimal_min_max_no_truncation() {
3213 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3215 let props = Arc::new(builder.build());
3216 let page_writer = get_test_page_writer();
3217 let mut writer =
3218 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3219
3220 let expected_value = vec![
3221 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3222 231u8, 90u8, 0u8, 0u8,
3223 ];
3224 let data = vec![ByteArray::from(expected_value.clone()).into()];
3225 writer.write_batch(&data, None, None).unwrap();
3226 writer.flush_data_pages().unwrap();
3227
3228 let r = writer.close().unwrap();
3229
3230 let column_index = r.column_index.unwrap();
3233 let column_index = match column_index {
3234 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3235 _ => panic!("wrong stats type"),
3236 };
3237 let column_index_min_bytes = column_index.min_value(0).unwrap();
3238 let column_index_max_bytes = column_index.max_value(0).unwrap();
3239 assert_eq!(expected_value, column_index_min_bytes);
3240 assert_eq!(expected_value, column_index_max_bytes);
3241
3242 let stats = r.metadata.statistics().unwrap();
3244 if let Statistics::FixedLenByteArray(stats) = stats {
3245 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3246 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3247 assert_eq!(expected_value, stats_min_bytes);
3248 assert_eq!(expected_value, stats_max_bytes);
3249 } else {
3250 panic!("expecting Statistics::FixedLenByteArray");
3251 }
3252 }
3253
3254 #[test]
3255 fn test_statistics_truncating_byte_array_default() {
3256 let page_writer = get_test_page_writer();
3257
3258 let props = WriterProperties::builder().build().into();
3260 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3261
3262 let mut data = vec![ByteArray::default(); 1];
3263 data[0].set_data(Bytes::from(String::from(
3264 "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3265 )));
3266 writer.write_batch(&data, None, None).unwrap();
3267 writer.flush_data_pages().unwrap();
3268
3269 let r = writer.close().unwrap();
3270
3271 assert_eq!(1, r.rows_written);
3272
3273 let stats = r.metadata.statistics().expect("statistics");
3274 if let Statistics::ByteArray(_stats) = stats {
3275 let min_value = _stats.min_opt().unwrap();
3276 let max_value = _stats.max_opt().unwrap();
3277
3278 assert!(!_stats.min_is_exact());
3279 assert!(!_stats.max_is_exact());
3280
3281 let expected_len = 64;
3282 assert_eq!(min_value.len(), expected_len);
3283 assert_eq!(max_value.len(), expected_len);
3284
3285 let expected_min =
3286 "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3287 assert_eq!(expected_min, min_value.as_bytes());
3288 let expected_max =
3290 "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3291 assert_eq!(expected_max, max_value.as_bytes());
3292 } else {
3293 panic!("expecting Statistics::ByteArray");
3294 }
3295 }
3296
3297 #[test]
3298 fn test_statistics_truncating_byte_array() {
3299 let page_writer = get_test_page_writer();
3300
3301 const TEST_TRUNCATE_LENGTH: usize = 1;
3302
3303 let builder =
3305 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3306 let props = Arc::new(builder.build());
3307 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3308
3309 let mut data = vec![ByteArray::default(); 1];
3310 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3312
3313 writer.write_batch(&data, None, None).unwrap();
3314
3315 writer.flush_data_pages().unwrap();
3316
3317 let r = writer.close().unwrap();
3318
3319 assert_eq!(1, r.rows_written);
3320
3321 let stats = r.metadata.statistics().expect("statistics");
3322 assert_eq!(stats.null_count_opt(), Some(0));
3323 assert_eq!(stats.distinct_count_opt(), None);
3324 if let Statistics::ByteArray(_stats) = stats {
3325 let min_value = _stats.min_opt().unwrap();
3326 let max_value = _stats.max_opt().unwrap();
3327
3328 assert!(!_stats.min_is_exact());
3329 assert!(!_stats.max_is_exact());
3330
3331 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3332 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3333
3334 assert_eq!("B".as_bytes(), min_value.as_bytes());
3335 assert_eq!("C".as_bytes(), max_value.as_bytes());
3336 } else {
3337 panic!("expecting Statistics::ByteArray");
3338 }
3339 }
3340
3341 #[test]
3342 fn test_statistics_truncating_fixed_len_byte_array() {
3343 let page_writer = get_test_page_writer();
3344
3345 const TEST_TRUNCATE_LENGTH: usize = 1;
3346
3347 let builder =
3349 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3350 let props = Arc::new(builder.build());
3351 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3352
3353 let mut data = vec![FixedLenByteArray::default(); 1];
3354
3355 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3356 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3357
3358 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3360 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3361
3362 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3364
3365 writer.write_batch(&data, None, None).unwrap();
3366
3367 writer.flush_data_pages().unwrap();
3368
3369 let r = writer.close().unwrap();
3370
3371 assert_eq!(1, r.rows_written);
3372
3373 let stats = r.metadata.statistics().expect("statistics");
3374 assert_eq!(stats.null_count_opt(), Some(0));
3375 assert_eq!(stats.distinct_count_opt(), None);
3376 if let Statistics::FixedLenByteArray(_stats) = stats {
3377 let min_value = _stats.min_opt().unwrap();
3378 let max_value = _stats.max_opt().unwrap();
3379
3380 assert!(!_stats.min_is_exact());
3381 assert!(!_stats.max_is_exact());
3382
3383 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3384 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3385
3386 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3387 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3388
3389 let reconstructed_min = i128::from_be_bytes([
3390 min_value.as_bytes()[0],
3391 0,
3392 0,
3393 0,
3394 0,
3395 0,
3396 0,
3397 0,
3398 0,
3399 0,
3400 0,
3401 0,
3402 0,
3403 0,
3404 0,
3405 0,
3406 ]);
3407
3408 let reconstructed_max = i128::from_be_bytes([
3409 max_value.as_bytes()[0],
3410 0,
3411 0,
3412 0,
3413 0,
3414 0,
3415 0,
3416 0,
3417 0,
3418 0,
3419 0,
3420 0,
3421 0,
3422 0,
3423 0,
3424 0,
3425 ]);
3426
3427 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3429 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3430 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3431 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3432 } else {
3433 panic!("expecting Statistics::FixedLenByteArray");
3434 }
3435 }
3436
3437 #[test]
3438 fn test_send() {
3439 fn test<T: Send>() {}
3440 test::<ColumnWriterImpl<Int32Type>>();
3441 }
3442
3443 #[test]
3444 fn test_increment() {
3445 let v = increment(vec![0, 0, 0]).unwrap();
3446 assert_eq!(&v, &[0, 0, 1]);
3447
3448 let v = increment(vec![0, 255, 255]).unwrap();
3450 assert_eq!(&v, &[1, 0, 0]);
3451
3452 let v = increment(vec![255, 255, 255]);
3454 assert!(v.is_none());
3455 }
3456
3457 #[test]
3458 fn test_increment_utf8() {
3459 let test_inc = |o: &str, expected: &str| {
3460 if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3461 assert_eq!(v, expected);
3463 assert!(*v > *o);
3465 let mut greater = ByteArray::new();
3467 greater.set_data(Bytes::from(v));
3468 let mut original = ByteArray::new();
3469 original.set_data(Bytes::from(o.as_bytes().to_vec()));
3470 assert!(greater > original);
3471 } else {
3472 panic!("Expected incremented UTF8 string to also be valid.");
3473 }
3474 };
3475
3476 test_inc("hello", "hellp");
3478
3479 test_inc("a\u{7f}", "b");
3481
3482 assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3484
3485 test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3487
3488 test_inc("éééé", "éééê");
3490
3491 test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3493
3494 test_inc("a\u{7ff}", "b");
3496
3497 assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3499
3500 test_inc("ࠀࠀ", "ࠀࠁ");
3503
3504 test_inc("a\u{ffff}", "b");
3506
3507 assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3509
3510 test_inc("𐀀𐀀", "𐀀𐀁");
3512
3513 test_inc("a\u{10ffff}", "b");
3515
3516 assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3518
3519 test_inc("a\u{D7FF}", "b");
3522 }
3523
3524 #[test]
3525 fn test_truncate_utf8() {
3526 let data = "❤️🧡💛💚💙💜";
3528 let r = truncate_utf8(data, data.len()).unwrap();
3529 assert_eq!(r.len(), data.len());
3530 assert_eq!(&r, data.as_bytes());
3531
3532 let r = truncate_utf8(data, 13).unwrap();
3534 assert_eq!(r.len(), 10);
3535 assert_eq!(&r, "❤️🧡".as_bytes());
3536
3537 let r = truncate_utf8("\u{0836}", 1);
3539 assert!(r.is_none());
3540
3541 let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3544 assert_eq!(&r, "yyyyyyyz".as_bytes());
3545
3546 let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3548 assert_eq!(&r, "ééê".as_bytes());
3549
3550 let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3552 assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3553
3554 let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3556 assert!(r.is_none());
3557
3558 let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3561 assert_eq!(&r, "ࠀࠁ".as_bytes());
3562
3563 let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3565 assert!(r.is_none());
3566
3567 let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3569 assert_eq!(&r, "𐀀𐀁".as_bytes());
3570
3571 let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3573 assert!(r.is_none());
3574 }
3575
3576 #[test]
3577 fn test_byte_array_truncate_invalid_utf8_statistics() {
3580 let message_type = "
3581 message test_schema {
3582 OPTIONAL BYTE_ARRAY a (UTF8);
3583 }
3584 ";
3585 let schema = Arc::new(parse_message_type(message_type).unwrap());
3586
3587 let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3589 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3590 let file: File = tempfile::tempfile().unwrap();
3591 let props = Arc::new(
3592 WriterProperties::builder()
3593 .set_statistics_enabled(EnabledStatistics::Chunk)
3594 .set_statistics_truncate_length(Some(8))
3595 .build(),
3596 );
3597
3598 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3599 let mut row_group_writer = writer.next_row_group().unwrap();
3600
3601 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3602 col_writer
3603 .typed::<ByteArrayType>()
3604 .write_batch(&data, Some(&def_levels), None)
3605 .unwrap();
3606 col_writer.close().unwrap();
3607 row_group_writer.close().unwrap();
3608 let file_metadata = writer.close().unwrap();
3609 let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
3610 assert!(!stats.max_is_exact());
3611 assert_eq!(
3614 stats.max_bytes_opt().map(|v| v.to_vec()),
3615 Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3616 );
3617 }
3618
3619 #[test]
3620 fn test_increment_max_binary_chars() {
3621 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3622 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3623
3624 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3625 assert!(incremented.is_none())
3626 }
3627
3628 #[test]
3629 fn test_no_column_index_when_stats_disabled() {
3630 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3634 let props = Arc::new(
3635 WriterProperties::builder()
3636 .set_statistics_enabled(EnabledStatistics::None)
3637 .build(),
3638 );
3639 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3640 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3641
3642 let data = Vec::new();
3643 let def_levels = vec![0; 10];
3644 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3645 writer.flush_data_pages().unwrap();
3646
3647 let column_close_result = writer.close().unwrap();
3648 assert!(column_close_result.offset_index.is_some());
3649 assert!(column_close_result.column_index.is_none());
3650 }
3651
3652 #[test]
3653 fn test_no_offset_index_when_disabled() {
3654 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3656 let props = Arc::new(
3657 WriterProperties::builder()
3658 .set_statistics_enabled(EnabledStatistics::None)
3659 .set_offset_index_disabled(true)
3660 .build(),
3661 );
3662 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3663 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3664
3665 let data = Vec::new();
3666 let def_levels = vec![0; 10];
3667 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3668 writer.flush_data_pages().unwrap();
3669
3670 let column_close_result = writer.close().unwrap();
3671 assert!(column_close_result.offset_index.is_none());
3672 assert!(column_close_result.column_index.is_none());
3673 }
3674
3675 #[test]
3676 fn test_offset_index_overridden() {
3677 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3679 let props = Arc::new(
3680 WriterProperties::builder()
3681 .set_statistics_enabled(EnabledStatistics::Page)
3682 .set_offset_index_disabled(true)
3683 .build(),
3684 );
3685 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3686 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3687
3688 let data = Vec::new();
3689 let def_levels = vec![0; 10];
3690 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3691 writer.flush_data_pages().unwrap();
3692
3693 let column_close_result = writer.close().unwrap();
3694 assert!(column_close_result.offset_index.is_some());
3695 assert!(column_close_result.column_index.is_some());
3696 }
3697
3698 #[test]
3699 fn test_boundary_order() -> Result<()> {
3700 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3701 let column_close_result = write_multiple_pages::<Int32Type>(
3703 &descr,
3704 &[
3705 &[Some(-10), Some(10)],
3706 &[Some(-5), Some(11)],
3707 &[None],
3708 &[Some(-5), Some(11)],
3709 ],
3710 )?;
3711 let boundary_order = column_close_result
3712 .column_index
3713 .unwrap()
3714 .get_boundary_order();
3715 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3716
3717 let column_close_result = write_multiple_pages::<Int32Type>(
3719 &descr,
3720 &[
3721 &[Some(10), Some(11)],
3722 &[Some(5), Some(11)],
3723 &[None],
3724 &[Some(-5), Some(0)],
3725 ],
3726 )?;
3727 let boundary_order = column_close_result
3728 .column_index
3729 .unwrap()
3730 .get_boundary_order();
3731 assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3732
3733 let column_close_result = write_multiple_pages::<Int32Type>(
3735 &descr,
3736 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3737 )?;
3738 let boundary_order = column_close_result
3739 .column_index
3740 .unwrap()
3741 .get_boundary_order();
3742 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3743
3744 let column_close_result =
3746 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3747 let boundary_order = column_close_result
3748 .column_index
3749 .unwrap()
3750 .get_boundary_order();
3751 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3752
3753 let column_close_result =
3755 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3756 let boundary_order = column_close_result
3757 .column_index
3758 .unwrap()
3759 .get_boundary_order();
3760 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3761
3762 let column_close_result =
3764 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3765 let boundary_order = column_close_result
3766 .column_index
3767 .unwrap()
3768 .get_boundary_order();
3769 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3770
3771 let column_close_result = write_multiple_pages::<Int32Type>(
3773 &descr,
3774 &[
3775 &[Some(10), Some(11)],
3776 &[Some(11), Some(16)],
3777 &[None],
3778 &[Some(-5), Some(0)],
3779 ],
3780 )?;
3781 let boundary_order = column_close_result
3782 .column_index
3783 .unwrap()
3784 .get_boundary_order();
3785 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3786
3787 let column_close_result = write_multiple_pages::<Int32Type>(
3789 &descr,
3790 &[
3791 &[Some(1), Some(9)],
3792 &[Some(2), Some(8)],
3793 &[None],
3794 &[Some(3), Some(7)],
3795 ],
3796 )?;
3797 let boundary_order = column_close_result
3798 .column_index
3799 .unwrap()
3800 .get_boundary_order();
3801 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3802
3803 Ok(())
3804 }
3805
3806 #[test]
3807 fn test_boundary_order_logical_type() -> Result<()> {
3808 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3811 let fba_descr = {
3812 let tpe = SchemaType::primitive_type_builder(
3813 "col",
3814 FixedLenByteArrayType::get_physical_type(),
3815 )
3816 .with_length(2)
3817 .build()?;
3818 Arc::new(ColumnDescriptor::new(
3819 Arc::new(tpe),
3820 1,
3821 0,
3822 ColumnPath::from("col"),
3823 ))
3824 };
3825
3826 let values: &[&[Option<FixedLenByteArray>]] = &[
3827 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3828 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3829 &[Some(FixedLenByteArray::from(ByteArray::from(
3830 f16::NEG_ZERO,
3831 )))],
3832 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3833 ];
3834
3835 let column_close_result =
3837 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3838 let boundary_order = column_close_result
3839 .column_index
3840 .unwrap()
3841 .get_boundary_order();
3842 assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3843
3844 let column_close_result =
3846 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3847 let boundary_order = column_close_result
3848 .column_index
3849 .unwrap()
3850 .get_boundary_order();
3851 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3852
3853 Ok(())
3854 }
3855
3856 #[test]
3857 fn test_interval_stats_should_not_have_min_max() {
3858 let input = [
3859 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3860 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3861 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3862 ]
3863 .into_iter()
3864 .map(|s| ByteArray::from(s).into())
3865 .collect::<Vec<_>>();
3866
3867 let page_writer = get_test_page_writer();
3868 let mut writer = get_test_interval_column_writer(page_writer);
3869 writer.write_batch(&input, None, None).unwrap();
3870
3871 let metadata = writer.close().unwrap().metadata;
3872 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3873 stats.clone()
3874 } else {
3875 panic!("metadata missing statistics");
3876 };
3877 assert!(stats.min_bytes_opt().is_none());
3878 assert!(stats.max_bytes_opt().is_none());
3879 }
3880
3881 #[test]
3882 #[cfg(feature = "arrow")]
3883 fn test_column_writer_get_estimated_total_bytes() {
3884 let page_writer = get_test_page_writer();
3885 let props = Default::default();
3886 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3887 assert_eq!(writer.get_estimated_total_bytes(), 0);
3888
3889 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3890 writer.add_data_page().unwrap();
3891 let size_with_one_page = writer.get_estimated_total_bytes();
3892 assert_eq!(size_with_one_page, 20);
3893
3894 writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3895 writer.add_data_page().unwrap();
3896 let size_with_two_pages = writer.get_estimated_total_bytes();
3897 assert_eq!(size_with_two_pages, 20 + 21);
3899 }
3900
3901 fn write_multiple_pages<T: DataType>(
3902 column_descr: &Arc<ColumnDescriptor>,
3903 pages: &[&[Option<T::T>]],
3904 ) -> Result<ColumnCloseResult> {
3905 let column_writer = get_column_writer(
3906 column_descr.clone(),
3907 Default::default(),
3908 get_test_page_writer(),
3909 );
3910 let mut writer = get_typed_column_writer::<T>(column_writer);
3911
3912 for &page in pages {
3913 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3914 let def_levels = page
3915 .iter()
3916 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3917 .collect::<Vec<_>>();
3918 writer.write_batch(&values, Some(&def_levels), None)?;
3919 writer.flush_data_pages()?;
3920 }
3921
3922 writer.close()
3923 }
3924
3925 fn column_roundtrip_random<T: DataType>(
3929 props: WriterProperties,
3930 max_size: usize,
3931 min_value: T::T,
3932 max_value: T::T,
3933 max_def_level: i16,
3934 max_rep_level: i16,
3935 ) where
3936 T::T: PartialOrd + SampleUniform + Copy,
3937 {
3938 let mut num_values: usize = 0;
3939
3940 let mut buf: Vec<i16> = Vec::new();
3941 let def_levels = if max_def_level > 0 {
3942 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3943 for &dl in &buf[..] {
3944 if dl == max_def_level {
3945 num_values += 1;
3946 }
3947 }
3948 Some(&buf[..])
3949 } else {
3950 num_values = max_size;
3951 None
3952 };
3953
3954 let mut buf: Vec<i16> = Vec::new();
3955 let rep_levels = if max_rep_level > 0 {
3956 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3957 buf[0] = 0; Some(&buf[..])
3959 } else {
3960 None
3961 };
3962
3963 let mut values: Vec<T::T> = Vec::new();
3964 random_numbers_range(num_values, min_value, max_value, &mut values);
3965
3966 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3967 }
3968
3969 fn column_roundtrip<T: DataType>(
3971 props: WriterProperties,
3972 values: &[T::T],
3973 def_levels: Option<&[i16]>,
3974 rep_levels: Option<&[i16]>,
3975 ) {
3976 let mut file = tempfile::tempfile().unwrap();
3977 let mut write = TrackedWrite::new(&mut file);
3978 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3979
3980 let max_def_level = match def_levels {
3981 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3982 None => 0i16,
3983 };
3984
3985 let max_rep_level = match rep_levels {
3986 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3987 None => 0i16,
3988 };
3989
3990 let mut max_batch_size = values.len();
3991 if let Some(levels) = def_levels {
3992 max_batch_size = max_batch_size.max(levels.len());
3993 }
3994 if let Some(levels) = rep_levels {
3995 max_batch_size = max_batch_size.max(levels.len());
3996 }
3997
3998 let mut writer =
3999 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4000
4001 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
4002 assert_eq!(values_written, values.len());
4003 let result = writer.close().unwrap();
4004
4005 drop(write);
4006
4007 let props = ReaderProperties::builder()
4008 .set_backward_compatible_lz4(false)
4009 .build();
4010 let page_reader = Box::new(
4011 SerializedPageReader::new_with_properties(
4012 Arc::new(file),
4013 &result.metadata,
4014 result.rows_written as usize,
4015 None,
4016 Arc::new(props),
4017 )
4018 .unwrap(),
4019 );
4020 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
4021
4022 let mut actual_values = Vec::with_capacity(max_batch_size);
4023 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
4024 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
4025
4026 let (_, values_read, levels_read) = reader
4027 .read_records(
4028 max_batch_size,
4029 actual_def_levels.as_mut(),
4030 actual_rep_levels.as_mut(),
4031 &mut actual_values,
4032 )
4033 .unwrap();
4034
4035 assert_eq!(&actual_values[..values_read], values);
4038 match actual_def_levels {
4039 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
4040 None => assert_eq!(None, def_levels),
4041 }
4042 match actual_rep_levels {
4043 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
4044 None => assert_eq!(None, rep_levels),
4045 }
4046
4047 if let Some(levels) = actual_rep_levels {
4050 let mut actual_rows_written = 0;
4051 for l in levels {
4052 if l == 0 {
4053 actual_rows_written += 1;
4054 }
4055 }
4056 assert_eq!(actual_rows_written, result.rows_written);
4057 } else if actual_def_levels.is_some() {
4058 assert_eq!(levels_read as u64, result.rows_written);
4059 } else {
4060 assert_eq!(values_read as u64, result.rows_written);
4061 }
4062 }
4063
4064 fn column_write_and_get_metadata<T: DataType>(
4067 props: WriterProperties,
4068 values: &[T::T],
4069 ) -> ColumnChunkMetaData {
4070 let page_writer = get_test_page_writer();
4071 let props = Arc::new(props);
4072 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4073 writer.write_batch(values, None, None).unwrap();
4074 writer.close().unwrap().metadata
4075 }
4076
4077 fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4079 PageEncodingStats {
4080 page_type,
4081 encoding,
4082 count,
4083 }
4084 }
4085
4086 fn check_encoding_write_support<T: DataType>(
4090 version: WriterVersion,
4091 dict_enabled: bool,
4092 data: &[T::T],
4093 dictionary_page_offset: Option<i64>,
4094 encodings: &[Encoding],
4095 page_encoding_stats: &[PageEncodingStats],
4096 ) {
4097 let props = WriterProperties::builder()
4098 .set_writer_version(version)
4099 .set_dictionary_enabled(dict_enabled)
4100 .build();
4101 let meta = column_write_and_get_metadata::<T>(props, data);
4102 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4103 assert_eq!(meta.encodings(), encodings);
4104 assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4105 }
4106
4107 fn get_test_column_writer<'a, T: DataType>(
4109 page_writer: Box<dyn PageWriter + 'a>,
4110 max_def_level: i16,
4111 max_rep_level: i16,
4112 props: WriterPropertiesPtr,
4113 ) -> ColumnWriterImpl<'a, T> {
4114 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4115 let column_writer = get_column_writer(descr, props, page_writer);
4116 get_typed_column_writer::<T>(column_writer)
4117 }
4118
4119 fn get_test_column_reader<T: DataType>(
4121 page_reader: Box<dyn PageReader>,
4122 max_def_level: i16,
4123 max_rep_level: i16,
4124 ) -> ColumnReaderImpl<T> {
4125 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4126 let column_reader = get_column_reader(descr, page_reader);
4127 get_typed_column_reader::<T>(column_reader)
4128 }
4129
4130 fn get_test_column_descr<T: DataType>(
4132 max_def_level: i16,
4133 max_rep_level: i16,
4134 ) -> ColumnDescriptor {
4135 let path = ColumnPath::from("col");
4136 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4137 .with_length(1)
4140 .build()
4141 .unwrap();
4142 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4143 }
4144
4145 fn get_test_page_writer() -> Box<dyn PageWriter> {
4147 Box::new(TestPageWriter {})
4148 }
4149
4150 struct TestPageWriter {}
4151
4152 impl PageWriter for TestPageWriter {
4153 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4154 let mut res = PageWriteSpec::new();
4155 res.page_type = page.page_type();
4156 res.uncompressed_size = page.uncompressed_size();
4157 res.compressed_size = page.compressed_size();
4158 res.num_values = page.num_values();
4159 res.offset = 0;
4160 res.bytes_written = page.data().len() as u64;
4161 Ok(res)
4162 }
4163
4164 fn close(&mut self) -> Result<()> {
4165 Ok(())
4166 }
4167 }
4168
4169 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4171 let page_writer = get_test_page_writer();
4172 let props = Default::default();
4173 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4174 writer.write_batch(values, None, None).unwrap();
4175
4176 let metadata = writer.close().unwrap().metadata;
4177 if let Some(stats) = metadata.statistics() {
4178 stats.clone()
4179 } else {
4180 panic!("metadata missing statistics");
4181 }
4182 }
4183
4184 fn get_test_decimals_column_writer<T: DataType>(
4186 page_writer: Box<dyn PageWriter>,
4187 max_def_level: i16,
4188 max_rep_level: i16,
4189 props: WriterPropertiesPtr,
4190 ) -> ColumnWriterImpl<'static, T> {
4191 let descr = Arc::new(get_test_decimals_column_descr::<T>(
4192 max_def_level,
4193 max_rep_level,
4194 ));
4195 let column_writer = get_column_writer(descr, props, page_writer);
4196 get_typed_column_writer::<T>(column_writer)
4197 }
4198
4199 fn get_test_decimals_column_descr<T: DataType>(
4201 max_def_level: i16,
4202 max_rep_level: i16,
4203 ) -> ColumnDescriptor {
4204 let path = ColumnPath::from("col");
4205 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4206 .with_length(16)
4207 .with_logical_type(Some(LogicalType::Decimal {
4208 scale: 2,
4209 precision: 3,
4210 }))
4211 .with_scale(2)
4212 .with_precision(3)
4213 .build()
4214 .unwrap();
4215 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4216 }
4217
4218 fn float16_statistics_roundtrip(
4219 values: &[FixedLenByteArray],
4220 ) -> ValueStatistics<FixedLenByteArray> {
4221 let page_writer = get_test_page_writer();
4222 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4223 writer.write_batch(values, None, None).unwrap();
4224
4225 let metadata = writer.close().unwrap().metadata;
4226 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4227 stats.clone()
4228 } else {
4229 panic!("metadata missing statistics");
4230 }
4231 }
4232
4233 fn get_test_float16_column_writer(
4234 page_writer: Box<dyn PageWriter>,
4235 props: WriterPropertiesPtr,
4236 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4237 let descr = Arc::new(get_test_float16_column_descr(0, 0));
4238 let column_writer = get_column_writer(descr, props, page_writer);
4239 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4240 }
4241
4242 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4243 let path = ColumnPath::from("col");
4244 let tpe =
4245 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4246 .with_length(2)
4247 .with_logical_type(Some(LogicalType::Float16))
4248 .build()
4249 .unwrap();
4250 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4251 }
4252
4253 fn get_test_interval_column_writer(
4254 page_writer: Box<dyn PageWriter>,
4255 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4256 let descr = Arc::new(get_test_interval_column_descr());
4257 let column_writer = get_column_writer(descr, Default::default(), page_writer);
4258 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4259 }
4260
4261 fn get_test_interval_column_descr() -> ColumnDescriptor {
4262 let path = ColumnPath::from("col");
4263 let tpe =
4264 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4265 .with_length(12)
4266 .with_converted_type(ConvertedType::INTERVAL)
4267 .build()
4268 .unwrap();
4269 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4270 }
4271
4272 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4274 page_writer: Box<dyn PageWriter + 'a>,
4275 max_def_level: i16,
4276 max_rep_level: i16,
4277 props: WriterPropertiesPtr,
4278 ) -> ColumnWriterImpl<'a, T> {
4279 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4280 max_def_level,
4281 max_rep_level,
4282 ));
4283 let column_writer = get_column_writer(descr, props, page_writer);
4284 get_typed_column_writer::<T>(column_writer)
4285 }
4286
4287 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4289 max_def_level: i16,
4290 max_rep_level: i16,
4291 ) -> ColumnDescriptor {
4292 let path = ColumnPath::from("col");
4293 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4294 .with_converted_type(ConvertedType::UINT_32)
4295 .build()
4296 .unwrap();
4297 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4298 }
4299
4300 #[test]
4301 fn test_page_v2_snappy_compression_fallback() {
4302 let page_writer = TestPageWriter {};
4304
4305 let props = WriterProperties::builder()
4307 .set_writer_version(WriterVersion::PARQUET_2_0)
4308 .set_dictionary_enabled(false)
4310 .set_compression(Compression::SNAPPY)
4311 .build();
4312
4313 let mut column_writer =
4314 get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
4315
4316 let values = vec![ByteArray::from("a")];
4319
4320 column_writer.write_batch(&values, None, None).unwrap();
4321
4322 let result = column_writer.close().unwrap();
4323 assert_eq!(
4324 result.metadata.uncompressed_size(),
4325 result.metadata.compressed_size()
4326 );
4327 }
4328}