1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::file::page_index::column_index::ColumnIndexMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use std::collections::{BTreeSet, VecDeque};
27use std::str;
28
29use crate::basic::{
30 BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType, Type,
31};
32use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
33use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
34use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
35use crate::data_type::private::ParquetValueType;
36use crate::data_type::*;
37use crate::encodings::levels::LevelEncoder;
38#[cfg(feature = "encryption")]
39use crate::encryption::encrypt::get_column_crypto_metadata;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{
42 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
43 OffsetIndexBuilder, PageEncodingStats,
44};
45use crate::file::properties::{
46 EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
47};
48use crate::file::statistics::{Statistics, ValueStatistics};
49use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
50
51pub(crate) mod encoder;
52
53macro_rules! downcast_writer {
54 ($e:expr, $i:ident, $b:expr) => {
55 match $e {
56 Self::BoolColumnWriter($i) => $b,
57 Self::Int32ColumnWriter($i) => $b,
58 Self::Int64ColumnWriter($i) => $b,
59 Self::Int96ColumnWriter($i) => $b,
60 Self::FloatColumnWriter($i) => $b,
61 Self::DoubleColumnWriter($i) => $b,
62 Self::ByteArrayColumnWriter($i) => $b,
63 Self::FixedLenByteArrayColumnWriter($i) => $b,
64 }
65 };
66}
67
68pub enum ColumnWriter<'a> {
72 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
74 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
76 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
78 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
80 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
82 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
84 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
86 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
88}
89
90impl ColumnWriter<'_> {
91 #[cfg(feature = "arrow")]
93 pub(crate) fn memory_size(&self) -> usize {
94 downcast_writer!(self, typed, typed.memory_size())
95 }
96
97 #[cfg(feature = "arrow")]
99 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
100 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
101 }
102
103 pub fn close(self) -> Result<ColumnCloseResult> {
105 downcast_writer!(self, typed, typed.close())
106 }
107}
108
109pub fn get_column_writer<'a>(
111 descr: ColumnDescPtr,
112 props: WriterPropertiesPtr,
113 page_writer: Box<dyn PageWriter + 'a>,
114) -> ColumnWriter<'a> {
115 match descr.physical_type() {
116 Type::BOOLEAN => {
117 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
118 }
119 Type::INT32 => {
120 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
121 }
122 Type::INT64 => {
123 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
124 }
125 Type::INT96 => {
126 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127 }
128 Type::FLOAT => {
129 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130 }
131 Type::DOUBLE => {
132 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133 }
134 Type::BYTE_ARRAY => {
135 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136 }
137 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
138 ColumnWriterImpl::new(descr, props, page_writer),
139 ),
140 }
141}
142
143pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
148 T::get_column_writer(col_writer).unwrap_or_else(|| {
149 panic!(
150 "Failed to convert column writer into a typed column writer for `{}` type",
151 T::get_physical_type()
152 )
153 })
154}
155
156pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
158 col_writer: &'b ColumnWriter<'a>,
159) -> &'b ColumnWriterImpl<'a, T> {
160 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
161 panic!(
162 "Failed to convert column writer into a typed column writer for `{}` type",
163 T::get_physical_type()
164 )
165 })
166}
167
168pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
170 col_writer: &'a mut ColumnWriter<'b>,
171) -> &'a mut ColumnWriterImpl<'b, T> {
172 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
173 panic!(
174 "Failed to convert column writer into a typed column writer for `{}` type",
175 T::get_physical_type()
176 )
177 })
178}
179
180#[derive(Debug, Clone)]
184pub struct ColumnCloseResult {
185 pub bytes_written: u64,
187 pub rows_written: u64,
189 pub metadata: ColumnChunkMetaData,
191 pub bloom_filter: Option<Sbbf>,
193 pub column_index: Option<ColumnIndexMetaData>,
195 pub offset_index: Option<OffsetIndexMetaData>,
197}
198
199#[derive(Default)]
201struct PageMetrics {
202 num_buffered_values: u32,
203 num_buffered_rows: u32,
204 num_page_nulls: u64,
205 repetition_level_histogram: Option<LevelHistogram>,
206 definition_level_histogram: Option<LevelHistogram>,
207}
208
209impl PageMetrics {
210 fn new() -> Self {
211 Default::default()
212 }
213
214 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
216 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
217 self
218 }
219
220 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
222 self.definition_level_histogram = LevelHistogram::try_new(max_level);
223 self
224 }
225
226 fn new_page(&mut self) {
229 self.num_buffered_values = 0;
230 self.num_buffered_rows = 0;
231 self.num_page_nulls = 0;
232 self.repetition_level_histogram
233 .as_mut()
234 .map(LevelHistogram::reset);
235 self.definition_level_histogram
236 .as_mut()
237 .map(LevelHistogram::reset);
238 }
239
240 fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
242 if let Some(ref mut rep_hist) = self.repetition_level_histogram {
243 rep_hist.update_from_levels(levels);
244 }
245 }
246
247 fn update_definition_level_histogram(&mut self, levels: &[i16]) {
249 if let Some(ref mut def_hist) = self.definition_level_histogram {
250 def_hist.update_from_levels(levels);
251 }
252 }
253}
254
255#[derive(Default)]
257struct ColumnMetrics<T: Default> {
258 total_bytes_written: u64,
259 total_rows_written: u64,
260 total_uncompressed_size: u64,
261 total_compressed_size: u64,
262 total_num_values: u64,
263 dictionary_page_offset: Option<u64>,
264 data_page_offset: Option<u64>,
265 min_column_value: Option<T>,
266 max_column_value: Option<T>,
267 num_column_nulls: u64,
268 column_distinct_count: Option<u64>,
269 variable_length_bytes: Option<i64>,
270 repetition_level_histogram: Option<LevelHistogram>,
271 definition_level_histogram: Option<LevelHistogram>,
272}
273
274impl<T: Default> ColumnMetrics<T> {
275 fn new() -> Self {
276 Default::default()
277 }
278
279 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
281 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
282 self
283 }
284
285 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
287 self.definition_level_histogram = LevelHistogram::try_new(max_level);
288 self
289 }
290
291 fn update_histogram(
293 chunk_histogram: &mut Option<LevelHistogram>,
294 page_histogram: &Option<LevelHistogram>,
295 ) {
296 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
297 chunk_hist.add(page_hist);
298 }
299 }
300
301 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
304 ColumnMetrics::<T>::update_histogram(
305 &mut self.definition_level_histogram,
306 &page_metrics.definition_level_histogram,
307 );
308 ColumnMetrics::<T>::update_histogram(
309 &mut self.repetition_level_histogram,
310 &page_metrics.repetition_level_histogram,
311 );
312 }
313
314 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
316 if let Some(var_bytes) = variable_length_bytes {
317 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
318 }
319 }
320}
321
322pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
324
325pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
327 descr: ColumnDescPtr,
329 props: WriterPropertiesPtr,
330 statistics_enabled: EnabledStatistics,
331
332 page_writer: Box<dyn PageWriter + 'a>,
333 codec: Compression,
334 compressor: Option<Box<dyn Codec>>,
335 encoder: E,
336
337 page_metrics: PageMetrics,
338 column_metrics: ColumnMetrics<E::T>,
340
341 encodings: BTreeSet<Encoding>,
344 encoding_stats: Vec<PageEncodingStats>,
345 def_levels_sink: Vec<i16>,
347 rep_levels_sink: Vec<i16>,
348 data_pages: VecDeque<CompressedPage>,
349 column_index_builder: ColumnIndexBuilder,
351 offset_index_builder: Option<OffsetIndexBuilder>,
352
353 data_page_boundary_ascending: bool,
356 data_page_boundary_descending: bool,
357 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
359}
360
361impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
362 pub fn new(
364 descr: ColumnDescPtr,
365 props: WriterPropertiesPtr,
366 page_writer: Box<dyn PageWriter + 'a>,
367 ) -> Self {
368 let codec = props.compression(descr.path());
369 let codec_options = CodecOptionsBuilder::default().build();
370 let compressor = create_codec(codec, &codec_options).unwrap();
371 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
372
373 let statistics_enabled = props.statistics_enabled(descr.path());
374
375 let mut encodings = BTreeSet::new();
376 encodings.insert(Encoding::RLE);
378
379 let mut page_metrics = PageMetrics::new();
380 let mut column_metrics = ColumnMetrics::<E::T>::new();
381
382 if statistics_enabled != EnabledStatistics::None {
384 page_metrics = page_metrics
385 .with_repetition_level_histogram(descr.max_rep_level())
386 .with_definition_level_histogram(descr.max_def_level());
387 column_metrics = column_metrics
388 .with_repetition_level_histogram(descr.max_rep_level())
389 .with_definition_level_histogram(descr.max_def_level())
390 }
391
392 let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type());
394 if statistics_enabled != EnabledStatistics::Page {
395 column_index_builder.to_invalid()
396 }
397
398 let offset_index_builder = match props.offset_index_disabled() {
400 false => Some(OffsetIndexBuilder::new()),
401 _ => None,
402 };
403
404 Self {
405 descr,
406 props,
407 statistics_enabled,
408 page_writer,
409 codec,
410 compressor,
411 encoder,
412 def_levels_sink: vec![],
413 rep_levels_sink: vec![],
414 data_pages: VecDeque::new(),
415 page_metrics,
416 column_metrics,
417 column_index_builder,
418 offset_index_builder,
419 encodings,
420 encoding_stats: vec![],
421 data_page_boundary_ascending: true,
422 data_page_boundary_descending: true,
423 last_non_null_data_page_min_max: None,
424 }
425 }
426
427 #[allow(clippy::too_many_arguments)]
428 pub(crate) fn write_batch_internal(
429 &mut self,
430 values: &E::Values,
431 value_indices: Option<&[usize]>,
432 def_levels: Option<&[i16]>,
433 rep_levels: Option<&[i16]>,
434 min: Option<&E::T>,
435 max: Option<&E::T>,
436 distinct_count: Option<u64>,
437 ) -> Result<usize> {
438 if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
440 if def.len() != rep.len() {
441 return Err(general_err!(
442 "Inconsistent length of definition and repetition levels: {} != {}",
443 def.len(),
444 rep.len()
445 ));
446 }
447 }
448
449 let num_levels = match def_levels {
460 Some(def_levels) => def_levels.len(),
461 None => values.len(),
462 };
463
464 if let Some(min) = min {
465 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
466 }
467 if let Some(max) = max {
468 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
469 }
470
471 if self.encoder.num_values() == 0 {
473 self.column_metrics.column_distinct_count = distinct_count;
474 } else {
475 self.column_metrics.column_distinct_count = None;
476 }
477
478 let mut values_offset = 0;
479 let mut levels_offset = 0;
480 let base_batch_size = self.props.write_batch_size();
481 while levels_offset < num_levels {
482 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
483
484 if let Some(r) = rep_levels {
486 while end_offset < r.len() && r[end_offset] != 0 {
487 end_offset += 1;
488 }
489 }
490
491 values_offset += self.write_mini_batch(
492 values,
493 values_offset,
494 value_indices,
495 end_offset - levels_offset,
496 def_levels.map(|lv| &lv[levels_offset..end_offset]),
497 rep_levels.map(|lv| &lv[levels_offset..end_offset]),
498 )?;
499 levels_offset = end_offset;
500 }
501
502 Ok(values_offset)
504 }
505
506 pub fn write_batch(
519 &mut self,
520 values: &E::Values,
521 def_levels: Option<&[i16]>,
522 rep_levels: Option<&[i16]>,
523 ) -> Result<usize> {
524 self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
525 }
526
527 pub fn write_batch_with_statistics(
535 &mut self,
536 values: &E::Values,
537 def_levels: Option<&[i16]>,
538 rep_levels: Option<&[i16]>,
539 min: Option<&E::T>,
540 max: Option<&E::T>,
541 distinct_count: Option<u64>,
542 ) -> Result<usize> {
543 self.write_batch_internal(
544 values,
545 None,
546 def_levels,
547 rep_levels,
548 min,
549 max,
550 distinct_count,
551 )
552 }
553
554 #[cfg(feature = "arrow")]
559 pub(crate) fn memory_size(&self) -> usize {
560 self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
561 }
562
563 pub fn get_total_bytes_written(&self) -> u64 {
569 self.column_metrics.total_bytes_written
570 }
571
572 #[cfg(feature = "arrow")]
578 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
579 self.data_pages
580 .iter()
581 .map(|page| page.data().len() as u64)
582 .sum::<u64>()
583 + self.column_metrics.total_bytes_written
584 + self.encoder.estimated_data_page_size() as u64
585 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
586 }
587
588 pub fn get_total_rows_written(&self) -> u64 {
591 self.column_metrics.total_rows_written
592 }
593
594 pub fn get_descriptor(&self) -> &ColumnDescPtr {
596 &self.descr
597 }
598
599 pub fn close(mut self) -> Result<ColumnCloseResult> {
602 if self.page_metrics.num_buffered_values > 0 {
603 self.add_data_page()?;
604 }
605 if self.encoder.has_dictionary() {
606 self.write_dictionary_page()?;
607 }
608 self.flush_data_pages()?;
609 let metadata = self.build_column_metadata()?;
610 self.page_writer.close()?;
611
612 let boundary_order = match (
613 self.data_page_boundary_ascending,
614 self.data_page_boundary_descending,
615 ) {
616 (true, _) => BoundaryOrder::ASCENDING,
619 (false, true) => BoundaryOrder::DESCENDING,
620 (false, false) => BoundaryOrder::UNORDERED,
621 };
622 self.column_index_builder.set_boundary_order(boundary_order);
623
624 let column_index = match self.column_index_builder.valid() {
625 true => Some(self.column_index_builder.build()?),
626 false => None,
627 };
628
629 let offset_index = self.offset_index_builder.map(|b| b.build());
630
631 Ok(ColumnCloseResult {
632 bytes_written: self.column_metrics.total_bytes_written,
633 rows_written: self.column_metrics.total_rows_written,
634 bloom_filter: self.encoder.flush_bloom_filter(),
635 metadata,
636 column_index,
637 offset_index,
638 })
639 }
640
641 fn write_mini_batch(
645 &mut self,
646 values: &E::Values,
647 values_offset: usize,
648 value_indices: Option<&[usize]>,
649 num_levels: usize,
650 def_levels: Option<&[i16]>,
651 rep_levels: Option<&[i16]>,
652 ) -> Result<usize> {
653 let values_to_write = if self.descr.max_def_level() > 0 {
655 let levels = def_levels.ok_or_else(|| {
656 general_err!(
657 "Definition levels are required, because max definition level = {}",
658 self.descr.max_def_level()
659 )
660 })?;
661
662 let values_to_write = levels
663 .iter()
664 .map(|level| (*level == self.descr.max_def_level()) as usize)
665 .sum();
666 self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
667
668 self.page_metrics.update_definition_level_histogram(levels);
670
671 self.def_levels_sink.extend_from_slice(levels);
672 values_to_write
673 } else {
674 num_levels
675 };
676
677 if self.descr.max_rep_level() > 0 {
679 let levels = rep_levels.ok_or_else(|| {
681 general_err!(
682 "Repetition levels are required, because max repetition level = {}",
683 self.descr.max_rep_level()
684 )
685 })?;
686
687 if !levels.is_empty() && levels[0] != 0 {
688 return Err(general_err!(
689 "Write must start at a record boundary, got non-zero repetition level of {}",
690 levels[0]
691 ));
692 }
693
694 for &level in levels {
696 self.page_metrics.num_buffered_rows += (level == 0) as u32
697 }
698
699 self.page_metrics.update_repetition_level_histogram(levels);
701
702 self.rep_levels_sink.extend_from_slice(levels);
703 } else {
704 self.page_metrics.num_buffered_rows += num_levels as u32;
707 }
708
709 match value_indices {
710 Some(indices) => {
711 let indices = &indices[values_offset..values_offset + values_to_write];
712 self.encoder.write_gather(values, indices)?;
713 }
714 None => self.encoder.write(values, values_offset, values_to_write)?,
715 }
716
717 self.page_metrics.num_buffered_values += num_levels as u32;
718
719 if self.should_add_data_page() {
720 self.add_data_page()?;
721 }
722
723 if self.should_dict_fallback() {
724 self.dict_fallback()?;
725 }
726
727 Ok(values_to_write)
728 }
729
730 #[inline]
735 fn should_dict_fallback(&self) -> bool {
736 match self.encoder.estimated_dict_page_size() {
737 Some(size) => {
738 size >= self
739 .props
740 .column_dictionary_page_size_limit(self.descr.path())
741 }
742 None => false,
743 }
744 }
745
746 #[inline]
748 fn should_add_data_page(&self) -> bool {
749 if self.page_metrics.num_buffered_values == 0 {
754 return false;
755 }
756
757 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
758 || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
759 }
760
761 fn dict_fallback(&mut self) -> Result<()> {
764 if self.page_metrics.num_buffered_values > 0 {
766 self.add_data_page()?;
767 }
768 self.write_dictionary_page()?;
769 self.flush_data_pages()?;
770 Ok(())
771 }
772
773 fn update_column_offset_index(
775 &mut self,
776 page_statistics: Option<&ValueStatistics<E::T>>,
777 page_variable_length_bytes: Option<i64>,
778 ) {
779 let null_page =
781 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
782 if null_page && self.column_index_builder.valid() {
785 self.column_index_builder.append(
786 null_page,
787 vec![],
788 vec![],
789 self.page_metrics.num_page_nulls as i64,
790 );
791 } else if self.column_index_builder.valid() {
792 match &page_statistics {
795 None => {
796 self.column_index_builder.to_invalid();
797 }
798 Some(stat) => {
799 let new_min = stat.min_opt().unwrap();
801 let new_max = stat.max_opt().unwrap();
802 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
803 if self.data_page_boundary_ascending {
804 let not_ascending = compare_greater(&self.descr, last_min, new_min)
806 || compare_greater(&self.descr, last_max, new_max);
807 if not_ascending {
808 self.data_page_boundary_ascending = false;
809 }
810 }
811
812 if self.data_page_boundary_descending {
813 let not_descending = compare_greater(&self.descr, new_min, last_min)
815 || compare_greater(&self.descr, new_max, last_max);
816 if not_descending {
817 self.data_page_boundary_descending = false;
818 }
819 }
820 }
821 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
822
823 if self.can_truncate_value() {
824 self.column_index_builder.append(
825 null_page,
826 self.truncate_min_value(
827 self.props.column_index_truncate_length(),
828 stat.min_bytes_opt().unwrap(),
829 )
830 .0,
831 self.truncate_max_value(
832 self.props.column_index_truncate_length(),
833 stat.max_bytes_opt().unwrap(),
834 )
835 .0,
836 self.page_metrics.num_page_nulls as i64,
837 );
838 } else {
839 self.column_index_builder.append(
840 null_page,
841 stat.min_bytes_opt().unwrap().to_vec(),
842 stat.max_bytes_opt().unwrap().to_vec(),
843 self.page_metrics.num_page_nulls as i64,
844 );
845 }
846 }
847 }
848 }
849
850 self.column_index_builder.append_histograms(
852 &self.page_metrics.repetition_level_histogram,
853 &self.page_metrics.definition_level_histogram,
854 );
855
856 if let Some(builder) = self.offset_index_builder.as_mut() {
858 builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
859 builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
860 }
861 }
862
863 fn can_truncate_value(&self) -> bool {
865 match self.descr.physical_type() {
866 Type::FIXED_LEN_BYTE_ARRAY
870 if !matches!(
871 self.descr.logical_type_ref(),
872 Some(&LogicalType::Decimal { .. }) | Some(&LogicalType::Float16)
873 ) =>
874 {
875 true
876 }
877 Type::BYTE_ARRAY => true,
878 _ => false,
880 }
881 }
882
883 fn is_utf8(&self) -> bool {
885 self.get_descriptor().logical_type_ref() == Some(&LogicalType::String)
886 || self.get_descriptor().converted_type() == ConvertedType::UTF8
887 }
888
889 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
900 truncation_length
901 .filter(|l| data.len() > *l)
902 .and_then(|l|
903 if self.is_utf8() {
905 match str::from_utf8(data) {
906 Ok(str_data) => truncate_utf8(str_data, l),
907 Err(_) => Some(data[..l].to_vec()),
908 }
909 } else {
910 Some(data[..l].to_vec())
911 }
912 )
913 .map(|truncated| (truncated, true))
914 .unwrap_or_else(|| (data.to_vec(), false))
915 }
916
917 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
931 truncation_length
932 .filter(|l| data.len() > *l)
933 .and_then(|l|
934 if self.is_utf8() {
936 match str::from_utf8(data) {
937 Ok(str_data) => truncate_and_increment_utf8(str_data, l),
938 Err(_) => increment(data[..l].to_vec()),
939 }
940 } else {
941 increment(data[..l].to_vec())
942 }
943 )
944 .map(|truncated| (truncated, true))
945 .unwrap_or_else(|| (data.to_vec(), false))
946 }
947
948 fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
951 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
952 match statistics {
953 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
954 let (min, did_truncate_min) = self.truncate_min_value(
955 self.props.statistics_truncate_length(),
956 stats.min_bytes_opt().unwrap(),
957 );
958 let (max, did_truncate_max) = self.truncate_max_value(
959 self.props.statistics_truncate_length(),
960 stats.max_bytes_opt().unwrap(),
961 );
962 Statistics::ByteArray(
963 ValueStatistics::new(
964 Some(min.into()),
965 Some(max.into()),
966 stats.distinct_count(),
967 stats.null_count_opt(),
968 backwards_compatible_min_max,
969 )
970 .with_max_is_exact(!did_truncate_max)
971 .with_min_is_exact(!did_truncate_min),
972 )
973 }
974 Statistics::FixedLenByteArray(stats)
975 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
976 {
977 let (min, did_truncate_min) = self.truncate_min_value(
978 self.props.statistics_truncate_length(),
979 stats.min_bytes_opt().unwrap(),
980 );
981 let (max, did_truncate_max) = self.truncate_max_value(
982 self.props.statistics_truncate_length(),
983 stats.max_bytes_opt().unwrap(),
984 );
985 Statistics::FixedLenByteArray(
986 ValueStatistics::new(
987 Some(min.into()),
988 Some(max.into()),
989 stats.distinct_count(),
990 stats.null_count_opt(),
991 backwards_compatible_min_max,
992 )
993 .with_max_is_exact(!did_truncate_max)
994 .with_min_is_exact(!did_truncate_min),
995 )
996 }
997 stats => stats,
998 }
999 }
1000
1001 fn add_data_page(&mut self) -> Result<()> {
1004 let values_data = self.encoder.flush_data_page()?;
1006
1007 let max_def_level = self.descr.max_def_level();
1008 let max_rep_level = self.descr.max_rep_level();
1009
1010 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1011
1012 let page_statistics = match (values_data.min_value, values_data.max_value) {
1013 (Some(min), Some(max)) => {
1014 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1016 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1017
1018 (self.statistics_enabled == EnabledStatistics::Page).then_some(
1019 ValueStatistics::new(
1020 Some(min),
1021 Some(max),
1022 None,
1023 Some(self.page_metrics.num_page_nulls),
1024 false,
1025 ),
1026 )
1027 }
1028 _ => None,
1029 };
1030
1031 self.update_column_offset_index(
1033 page_statistics.as_ref(),
1034 values_data.variable_length_bytes,
1035 );
1036
1037 self.column_metrics
1039 .update_from_page_metrics(&self.page_metrics);
1040 self.column_metrics
1041 .update_variable_length_bytes(values_data.variable_length_bytes);
1042
1043 let page_statistics = page_statistics
1045 .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1046 .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1047
1048 let compressed_page = match self.props.writer_version() {
1049 WriterVersion::PARQUET_1_0 => {
1050 let mut buffer = vec![];
1051
1052 if max_rep_level > 0 {
1053 buffer.extend_from_slice(
1054 &self.encode_levels_v1(
1055 Encoding::RLE,
1056 &self.rep_levels_sink[..],
1057 max_rep_level,
1058 )[..],
1059 );
1060 }
1061
1062 if max_def_level > 0 {
1063 buffer.extend_from_slice(
1064 &self.encode_levels_v1(
1065 Encoding::RLE,
1066 &self.def_levels_sink[..],
1067 max_def_level,
1068 )[..],
1069 );
1070 }
1071
1072 buffer.extend_from_slice(&values_data.buf);
1073 let uncompressed_size = buffer.len();
1074
1075 if let Some(ref mut cmpr) = self.compressor {
1076 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1077 cmpr.compress(&buffer[..], &mut compressed_buf)?;
1078 compressed_buf.shrink_to_fit();
1079 buffer = compressed_buf;
1080 }
1081
1082 let data_page = Page::DataPage {
1083 buf: buffer.into(),
1084 num_values: self.page_metrics.num_buffered_values,
1085 encoding: values_data.encoding,
1086 def_level_encoding: Encoding::RLE,
1087 rep_level_encoding: Encoding::RLE,
1088 statistics: page_statistics,
1089 };
1090
1091 CompressedPage::new(data_page, uncompressed_size)
1092 }
1093 WriterVersion::PARQUET_2_0 => {
1094 let mut rep_levels_byte_len = 0;
1095 let mut def_levels_byte_len = 0;
1096 let mut buffer = vec![];
1097
1098 if max_rep_level > 0 {
1099 let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1100 rep_levels_byte_len = levels.len();
1101 buffer.extend_from_slice(&levels[..]);
1102 }
1103
1104 if max_def_level > 0 {
1105 let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1106 def_levels_byte_len = levels.len();
1107 buffer.extend_from_slice(&levels[..]);
1108 }
1109
1110 let uncompressed_size =
1111 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1112
1113 let is_compressed = match self.compressor {
1115 Some(ref mut cmpr) => {
1116 let buffer_len = buffer.len();
1117 cmpr.compress(&values_data.buf, &mut buffer)?;
1118 if uncompressed_size <= buffer.len() - buffer_len {
1119 buffer.truncate(buffer_len);
1120 buffer.extend_from_slice(&values_data.buf);
1121 false
1122 } else {
1123 true
1124 }
1125 }
1126 None => {
1127 buffer.extend_from_slice(&values_data.buf);
1128 false
1129 }
1130 };
1131
1132 let data_page = Page::DataPageV2 {
1133 buf: buffer.into(),
1134 num_values: self.page_metrics.num_buffered_values,
1135 encoding: values_data.encoding,
1136 num_nulls: self.page_metrics.num_page_nulls as u32,
1137 num_rows: self.page_metrics.num_buffered_rows,
1138 def_levels_byte_len: def_levels_byte_len as u32,
1139 rep_levels_byte_len: rep_levels_byte_len as u32,
1140 is_compressed,
1141 statistics: page_statistics,
1142 };
1143
1144 CompressedPage::new(data_page, uncompressed_size)
1145 }
1146 };
1147
1148 if self.encoder.has_dictionary() {
1150 self.data_pages.push_back(compressed_page);
1151 } else {
1152 self.write_data_page(compressed_page)?;
1153 }
1154
1155 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1157
1158 self.rep_levels_sink.clear();
1160 self.def_levels_sink.clear();
1161 self.page_metrics.new_page();
1162
1163 Ok(())
1164 }
1165
1166 #[inline]
1169 fn flush_data_pages(&mut self) -> Result<()> {
1170 if self.page_metrics.num_buffered_values > 0 {
1172 self.add_data_page()?;
1173 }
1174
1175 while let Some(page) = self.data_pages.pop_front() {
1176 self.write_data_page(page)?;
1177 }
1178
1179 Ok(())
1180 }
1181
1182 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1184 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1185 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1186 let num_values = self.column_metrics.total_num_values as i64;
1187 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1188 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1190
1191 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1192 .set_compression(self.codec)
1193 .set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
1194 .set_page_encoding_stats(self.encoding_stats.clone())
1195 .set_total_compressed_size(total_compressed_size)
1196 .set_total_uncompressed_size(total_uncompressed_size)
1197 .set_num_values(num_values)
1198 .set_data_page_offset(data_page_offset)
1199 .set_dictionary_page_offset(dict_page_offset);
1200
1201 if self.statistics_enabled != EnabledStatistics::None {
1202 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1203
1204 let statistics = ValueStatistics::<E::T>::new(
1205 self.column_metrics.min_column_value.clone(),
1206 self.column_metrics.max_column_value.clone(),
1207 self.column_metrics.column_distinct_count,
1208 Some(self.column_metrics.num_column_nulls),
1209 false,
1210 )
1211 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1212 .into();
1213
1214 let statistics = self.truncate_statistics(statistics);
1215
1216 builder = builder
1217 .set_statistics(statistics)
1218 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1219 .set_repetition_level_histogram(
1220 self.column_metrics.repetition_level_histogram.take(),
1221 )
1222 .set_definition_level_histogram(
1223 self.column_metrics.definition_level_histogram.take(),
1224 );
1225
1226 if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
1227 builder = builder.set_geo_statistics(geo_stats);
1228 }
1229 }
1230
1231 builder = self.set_column_chunk_encryption_properties(builder);
1232
1233 let metadata = builder.build()?;
1234 Ok(metadata)
1235 }
1236
1237 #[inline]
1239 fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1240 let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1241 encoder.put(levels);
1242 encoder.consume()
1243 }
1244
1245 #[inline]
1248 fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1249 let mut encoder = LevelEncoder::v2(max_level, levels.len());
1250 encoder.put(levels);
1251 encoder.consume()
1252 }
1253
1254 #[inline]
1256 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1257 self.encodings.insert(page.encoding());
1258 match self.encoding_stats.last_mut() {
1259 Some(encoding_stats)
1260 if encoding_stats.page_type == page.page_type()
1261 && encoding_stats.encoding == page.encoding() =>
1262 {
1263 encoding_stats.count += 1;
1264 }
1265 _ => {
1266 self.encoding_stats.push(PageEncodingStats {
1269 page_type: page.page_type(),
1270 encoding: page.encoding(),
1271 count: 1,
1272 });
1273 }
1274 }
1275 let page_spec = self.page_writer.write_page(page)?;
1276 if let Some(builder) = self.offset_index_builder.as_mut() {
1279 builder
1280 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1281 }
1282 self.update_metrics_for_page(page_spec);
1283 Ok(())
1284 }
1285
1286 #[inline]
1288 fn write_dictionary_page(&mut self) -> Result<()> {
1289 let compressed_page = {
1290 let mut page = self
1291 .encoder
1292 .flush_dict_page()?
1293 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1294
1295 let uncompressed_size = page.buf.len();
1296
1297 if let Some(ref mut cmpr) = self.compressor {
1298 let mut output_buf = Vec::with_capacity(uncompressed_size);
1299 cmpr.compress(&page.buf, &mut output_buf)?;
1300 page.buf = Bytes::from(output_buf);
1301 }
1302
1303 let dict_page = Page::DictionaryPage {
1304 buf: page.buf,
1305 num_values: page.num_values as u32,
1306 encoding: self.props.dictionary_page_encoding(),
1307 is_sorted: page.is_sorted,
1308 };
1309 CompressedPage::new(dict_page, uncompressed_size)
1310 };
1311
1312 self.encodings.insert(compressed_page.encoding());
1313 self.encoding_stats.push(PageEncodingStats {
1314 page_type: PageType::DICTIONARY_PAGE,
1315 encoding: compressed_page.encoding(),
1316 count: 1,
1317 });
1318 let page_spec = self.page_writer.write_page(compressed_page)?;
1319 self.update_metrics_for_page(page_spec);
1320 Ok(())
1322 }
1323
1324 #[inline]
1326 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1327 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1328 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1329 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1330
1331 match page_spec.page_type {
1332 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1333 self.column_metrics.total_num_values += page_spec.num_values as u64;
1334 if self.column_metrics.data_page_offset.is_none() {
1335 self.column_metrics.data_page_offset = Some(page_spec.offset);
1336 }
1337 }
1338 PageType::DICTIONARY_PAGE => {
1339 assert!(
1340 self.column_metrics.dictionary_page_offset.is_none(),
1341 "Dictionary offset is already set"
1342 );
1343 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1344 }
1345 _ => {}
1346 }
1347 }
1348
1349 #[inline]
1350 #[cfg(feature = "encryption")]
1351 fn set_column_chunk_encryption_properties(
1352 &self,
1353 builder: ColumnChunkMetaDataBuilder,
1354 ) -> ColumnChunkMetaDataBuilder {
1355 if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1356 builder.set_column_crypto_metadata(get_column_crypto_metadata(
1357 encryption_properties,
1358 &self.descr,
1359 ))
1360 } else {
1361 builder
1362 }
1363 }
1364
1365 #[inline]
1366 #[cfg(not(feature = "encryption"))]
1367 fn set_column_chunk_encryption_properties(
1368 &self,
1369 builder: ColumnChunkMetaDataBuilder,
1370 ) -> ColumnChunkMetaDataBuilder {
1371 builder
1372 }
1373}
1374
1375fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1376 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1377}
1378
1379fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1380 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1381}
1382
1383#[inline]
1384#[allow(clippy::eq_op)]
1385fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1386 match T::PHYSICAL_TYPE {
1387 Type::FLOAT | Type::DOUBLE => val != val,
1388 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type_ref() == Some(&LogicalType::Float16) => {
1389 let val = val.as_bytes();
1390 let val = f16::from_le_bytes([val[0], val[1]]);
1391 val.is_nan()
1392 }
1393 _ => false,
1394 }
1395}
1396
1397fn update_stat<T: ParquetValueType, F>(
1402 descr: &ColumnDescriptor,
1403 val: &T,
1404 cur: &mut Option<T>,
1405 should_update: F,
1406) where
1407 F: Fn(&T) -> bool,
1408{
1409 if is_nan(descr, val) {
1410 return;
1411 }
1412
1413 if cur.as_ref().is_none_or(should_update) {
1414 *cur = Some(val.clone());
1415 }
1416}
1417
1418fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1420 match T::PHYSICAL_TYPE {
1421 Type::INT32 | Type::INT64 => {
1422 if let Some(LogicalType::Integer {
1423 is_signed: false, ..
1424 }) = descr.logical_type_ref()
1425 {
1426 return compare_greater_unsigned_int(a, b);
1428 }
1429
1430 match descr.converted_type() {
1431 ConvertedType::UINT_8
1432 | ConvertedType::UINT_16
1433 | ConvertedType::UINT_32
1434 | ConvertedType::UINT_64 => {
1435 return compare_greater_unsigned_int(a, b);
1436 }
1437 _ => {}
1438 };
1439 }
1440 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1441 if let Some(LogicalType::Decimal { .. }) = descr.logical_type_ref() {
1442 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1443 }
1444 if let ConvertedType::DECIMAL = descr.converted_type() {
1445 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1446 }
1447 if let Some(LogicalType::Float16) = descr.logical_type_ref() {
1448 return compare_greater_f16(a.as_bytes(), b.as_bytes());
1449 }
1450 }
1451
1452 _ => {}
1453 }
1454
1455 a > b
1457}
1458
1459fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1467 match (kind, props.writer_version()) {
1468 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1469 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1470 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1471 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1472 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1473 _ => Encoding::PLAIN,
1474 }
1475}
1476
1477fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1479 match (kind, props.writer_version()) {
1480 (Type::BOOLEAN, _) => false,
1482 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1484 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1485 _ => true,
1486 }
1487}
1488
1489#[inline]
1490fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1491 a.as_u64().unwrap() > b.as_u64().unwrap()
1492}
1493
1494#[inline]
1495fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1496 let a = f16::from_le_bytes(a.try_into().unwrap());
1497 let b = f16::from_le_bytes(b.try_into().unwrap());
1498 a > b
1499}
1500
1501fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1503 let a_length = a.len();
1504 let b_length = b.len();
1505
1506 if a_length == 0 || b_length == 0 {
1507 return a_length > 0;
1508 }
1509
1510 let first_a: u8 = a[0];
1511 let first_b: u8 = b[0];
1512
1513 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1518 return (first_a as i8) > (first_b as i8);
1519 }
1520
1521 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1527
1528 if a_length != b_length {
1529 let not_equal = if a_length > b_length {
1530 let lead_length = a_length - b_length;
1531 a[0..lead_length].iter().any(|&x| x != extension)
1532 } else {
1533 let lead_length = b_length - a_length;
1534 b[0..lead_length].iter().any(|&x| x != extension)
1535 };
1536
1537 if not_equal {
1538 let negative_values: bool = (first_a as i8) < 0;
1539 let a_longer: bool = a_length > b_length;
1540 return if negative_values { !a_longer } else { a_longer };
1541 }
1542 }
1543
1544 (a[1..]) > (b[1..])
1545}
1546
1547fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1553 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1554 Some(data.as_bytes()[..split].to_vec())
1555}
1556
1557fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1563 let lower_bound = length.saturating_sub(3);
1565 let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1566 increment_utf8(data.get(..split)?)
1567}
1568
1569fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1576 for (idx, original_char) in data.char_indices().rev() {
1577 let original_len = original_char.len_utf8();
1578 if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1579 if next_char.len_utf8() == original_len {
1581 let mut result = data.as_bytes()[..idx + original_len].to_vec();
1582 next_char.encode_utf8(&mut result[idx..]);
1583 return Some(result);
1584 }
1585 }
1586 }
1587
1588 None
1589}
1590
1591fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1595 for byte in data.iter_mut().rev() {
1596 let (incremented, overflow) = byte.overflowing_add(1);
1597 *byte = incremented;
1598
1599 if !overflow {
1600 return Some(data);
1601 }
1602 }
1603
1604 None
1605}
1606
1607#[cfg(test)]
1608mod tests {
1609 use crate::{
1610 file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1611 schema::parser::parse_message_type,
1612 };
1613 use core::str;
1614 use rand::distr::uniform::SampleUniform;
1615 use std::{fs::File, sync::Arc};
1616
1617 use crate::column::{
1618 page::PageReader,
1619 reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader},
1620 };
1621 use crate::file::writer::TrackedWrite;
1622 use crate::file::{
1623 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1624 };
1625 use crate::schema::types::{ColumnPath, Type as SchemaType};
1626 use crate::util::test_common::rand_gen::random_numbers_range;
1627
1628 use super::*;
1629
1630 #[test]
1631 fn test_column_writer_inconsistent_def_rep_length() {
1632 let page_writer = get_test_page_writer();
1633 let props = Default::default();
1634 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1635 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1636 assert!(res.is_err());
1637 if let Err(err) = res {
1638 assert_eq!(
1639 format!("{err}"),
1640 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1641 );
1642 }
1643 }
1644
1645 #[test]
1646 fn test_column_writer_invalid_def_levels() {
1647 let page_writer = get_test_page_writer();
1648 let props = Default::default();
1649 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1650 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1651 assert!(res.is_err());
1652 if let Err(err) = res {
1653 assert_eq!(
1654 format!("{err}"),
1655 "Parquet error: Definition levels are required, because max definition level = 1"
1656 );
1657 }
1658 }
1659
1660 #[test]
1661 fn test_column_writer_invalid_rep_levels() {
1662 let page_writer = get_test_page_writer();
1663 let props = Default::default();
1664 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1665 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1666 assert!(res.is_err());
1667 if let Err(err) = res {
1668 assert_eq!(
1669 format!("{err}"),
1670 "Parquet error: Repetition levels are required, because max repetition level = 1"
1671 );
1672 }
1673 }
1674
1675 #[test]
1676 fn test_column_writer_not_enough_values_to_write() {
1677 let page_writer = get_test_page_writer();
1678 let props = Default::default();
1679 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1680 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1681 assert!(res.is_err());
1682 if let Err(err) = res {
1683 assert_eq!(
1684 format!("{err}"),
1685 "Parquet error: Expected to write 4 values, but have only 2"
1686 );
1687 }
1688 }
1689
1690 #[test]
1691 fn test_column_writer_write_only_one_dictionary_page() {
1692 let page_writer = get_test_page_writer();
1693 let props = Default::default();
1694 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1695 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1696 writer.add_data_page().unwrap();
1698 writer.write_dictionary_page().unwrap();
1699 let err = writer.write_dictionary_page().unwrap_err().to_string();
1700 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1701 }
1702
1703 #[test]
1704 fn test_column_writer_error_when_writing_disabled_dictionary() {
1705 let page_writer = get_test_page_writer();
1706 let props = Arc::new(
1707 WriterProperties::builder()
1708 .set_dictionary_enabled(false)
1709 .build(),
1710 );
1711 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1712 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1713 let err = writer.write_dictionary_page().unwrap_err().to_string();
1714 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1715 }
1716
1717 #[test]
1718 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1719 let page_writer = get_test_page_writer();
1720 let props = Arc::new(
1721 WriterProperties::builder()
1722 .set_dictionary_enabled(true)
1723 .build(),
1724 );
1725 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1726 writer
1727 .write_batch(&[true, false, true, false], None, None)
1728 .unwrap();
1729
1730 let r = writer.close().unwrap();
1731 assert_eq!(r.bytes_written, 1);
1734 assert_eq!(r.rows_written, 4);
1735
1736 let metadata = r.metadata;
1737 assert_eq!(
1738 metadata.encodings().collect::<Vec<_>>(),
1739 vec![Encoding::PLAIN, Encoding::RLE]
1740 );
1741 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
1743 }
1744
1745 #[test]
1746 fn test_column_writer_default_encoding_support_bool() {
1747 check_encoding_write_support::<BoolType>(
1748 WriterVersion::PARQUET_1_0,
1749 true,
1750 &[true, false],
1751 None,
1752 &[Encoding::PLAIN, Encoding::RLE],
1753 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1754 );
1755 check_encoding_write_support::<BoolType>(
1756 WriterVersion::PARQUET_1_0,
1757 false,
1758 &[true, false],
1759 None,
1760 &[Encoding::PLAIN, Encoding::RLE],
1761 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1762 );
1763 check_encoding_write_support::<BoolType>(
1764 WriterVersion::PARQUET_2_0,
1765 true,
1766 &[true, false],
1767 None,
1768 &[Encoding::RLE],
1769 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1770 );
1771 check_encoding_write_support::<BoolType>(
1772 WriterVersion::PARQUET_2_0,
1773 false,
1774 &[true, false],
1775 None,
1776 &[Encoding::RLE],
1777 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1778 );
1779 }
1780
1781 #[test]
1782 fn test_column_writer_default_encoding_support_int32() {
1783 check_encoding_write_support::<Int32Type>(
1784 WriterVersion::PARQUET_1_0,
1785 true,
1786 &[1, 2],
1787 Some(0),
1788 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1789 &[
1790 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1791 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1792 ],
1793 );
1794 check_encoding_write_support::<Int32Type>(
1795 WriterVersion::PARQUET_1_0,
1796 false,
1797 &[1, 2],
1798 None,
1799 &[Encoding::PLAIN, Encoding::RLE],
1800 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1801 );
1802 check_encoding_write_support::<Int32Type>(
1803 WriterVersion::PARQUET_2_0,
1804 true,
1805 &[1, 2],
1806 Some(0),
1807 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1808 &[
1809 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1810 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1811 ],
1812 );
1813 check_encoding_write_support::<Int32Type>(
1814 WriterVersion::PARQUET_2_0,
1815 false,
1816 &[1, 2],
1817 None,
1818 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1819 &[encoding_stats(
1820 PageType::DATA_PAGE_V2,
1821 Encoding::DELTA_BINARY_PACKED,
1822 1,
1823 )],
1824 );
1825 }
1826
1827 #[test]
1828 fn test_column_writer_default_encoding_support_int64() {
1829 check_encoding_write_support::<Int64Type>(
1830 WriterVersion::PARQUET_1_0,
1831 true,
1832 &[1, 2],
1833 Some(0),
1834 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1835 &[
1836 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1837 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1838 ],
1839 );
1840 check_encoding_write_support::<Int64Type>(
1841 WriterVersion::PARQUET_1_0,
1842 false,
1843 &[1, 2],
1844 None,
1845 &[Encoding::PLAIN, Encoding::RLE],
1846 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1847 );
1848 check_encoding_write_support::<Int64Type>(
1849 WriterVersion::PARQUET_2_0,
1850 true,
1851 &[1, 2],
1852 Some(0),
1853 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1854 &[
1855 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1856 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1857 ],
1858 );
1859 check_encoding_write_support::<Int64Type>(
1860 WriterVersion::PARQUET_2_0,
1861 false,
1862 &[1, 2],
1863 None,
1864 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1865 &[encoding_stats(
1866 PageType::DATA_PAGE_V2,
1867 Encoding::DELTA_BINARY_PACKED,
1868 1,
1869 )],
1870 );
1871 }
1872
1873 #[test]
1874 fn test_column_writer_default_encoding_support_int96() {
1875 check_encoding_write_support::<Int96Type>(
1876 WriterVersion::PARQUET_1_0,
1877 true,
1878 &[Int96::from(vec![1, 2, 3])],
1879 Some(0),
1880 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1881 &[
1882 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1883 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1884 ],
1885 );
1886 check_encoding_write_support::<Int96Type>(
1887 WriterVersion::PARQUET_1_0,
1888 false,
1889 &[Int96::from(vec![1, 2, 3])],
1890 None,
1891 &[Encoding::PLAIN, Encoding::RLE],
1892 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1893 );
1894 check_encoding_write_support::<Int96Type>(
1895 WriterVersion::PARQUET_2_0,
1896 true,
1897 &[Int96::from(vec![1, 2, 3])],
1898 Some(0),
1899 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1900 &[
1901 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1902 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1903 ],
1904 );
1905 check_encoding_write_support::<Int96Type>(
1906 WriterVersion::PARQUET_2_0,
1907 false,
1908 &[Int96::from(vec![1, 2, 3])],
1909 None,
1910 &[Encoding::PLAIN, Encoding::RLE],
1911 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1912 );
1913 }
1914
1915 #[test]
1916 fn test_column_writer_default_encoding_support_float() {
1917 check_encoding_write_support::<FloatType>(
1918 WriterVersion::PARQUET_1_0,
1919 true,
1920 &[1.0, 2.0],
1921 Some(0),
1922 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1923 &[
1924 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1925 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1926 ],
1927 );
1928 check_encoding_write_support::<FloatType>(
1929 WriterVersion::PARQUET_1_0,
1930 false,
1931 &[1.0, 2.0],
1932 None,
1933 &[Encoding::PLAIN, Encoding::RLE],
1934 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1935 );
1936 check_encoding_write_support::<FloatType>(
1937 WriterVersion::PARQUET_2_0,
1938 true,
1939 &[1.0, 2.0],
1940 Some(0),
1941 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1942 &[
1943 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1944 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1945 ],
1946 );
1947 check_encoding_write_support::<FloatType>(
1948 WriterVersion::PARQUET_2_0,
1949 false,
1950 &[1.0, 2.0],
1951 None,
1952 &[Encoding::PLAIN, Encoding::RLE],
1953 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1954 );
1955 }
1956
1957 #[test]
1958 fn test_column_writer_default_encoding_support_double() {
1959 check_encoding_write_support::<DoubleType>(
1960 WriterVersion::PARQUET_1_0,
1961 true,
1962 &[1.0, 2.0],
1963 Some(0),
1964 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1965 &[
1966 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1967 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1968 ],
1969 );
1970 check_encoding_write_support::<DoubleType>(
1971 WriterVersion::PARQUET_1_0,
1972 false,
1973 &[1.0, 2.0],
1974 None,
1975 &[Encoding::PLAIN, Encoding::RLE],
1976 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1977 );
1978 check_encoding_write_support::<DoubleType>(
1979 WriterVersion::PARQUET_2_0,
1980 true,
1981 &[1.0, 2.0],
1982 Some(0),
1983 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1984 &[
1985 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1986 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1987 ],
1988 );
1989 check_encoding_write_support::<DoubleType>(
1990 WriterVersion::PARQUET_2_0,
1991 false,
1992 &[1.0, 2.0],
1993 None,
1994 &[Encoding::PLAIN, Encoding::RLE],
1995 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1996 );
1997 }
1998
1999 #[test]
2000 fn test_column_writer_default_encoding_support_byte_array() {
2001 check_encoding_write_support::<ByteArrayType>(
2002 WriterVersion::PARQUET_1_0,
2003 true,
2004 &[ByteArray::from(vec![1u8])],
2005 Some(0),
2006 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2007 &[
2008 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2009 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2010 ],
2011 );
2012 check_encoding_write_support::<ByteArrayType>(
2013 WriterVersion::PARQUET_1_0,
2014 false,
2015 &[ByteArray::from(vec![1u8])],
2016 None,
2017 &[Encoding::PLAIN, Encoding::RLE],
2018 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2019 );
2020 check_encoding_write_support::<ByteArrayType>(
2021 WriterVersion::PARQUET_2_0,
2022 true,
2023 &[ByteArray::from(vec![1u8])],
2024 Some(0),
2025 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2026 &[
2027 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2028 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2029 ],
2030 );
2031 check_encoding_write_support::<ByteArrayType>(
2032 WriterVersion::PARQUET_2_0,
2033 false,
2034 &[ByteArray::from(vec![1u8])],
2035 None,
2036 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2037 &[encoding_stats(
2038 PageType::DATA_PAGE_V2,
2039 Encoding::DELTA_BYTE_ARRAY,
2040 1,
2041 )],
2042 );
2043 }
2044
2045 #[test]
2046 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2047 check_encoding_write_support::<FixedLenByteArrayType>(
2048 WriterVersion::PARQUET_1_0,
2049 true,
2050 &[ByteArray::from(vec![1u8]).into()],
2051 None,
2052 &[Encoding::PLAIN, Encoding::RLE],
2053 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2054 );
2055 check_encoding_write_support::<FixedLenByteArrayType>(
2056 WriterVersion::PARQUET_1_0,
2057 false,
2058 &[ByteArray::from(vec![1u8]).into()],
2059 None,
2060 &[Encoding::PLAIN, Encoding::RLE],
2061 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2062 );
2063 check_encoding_write_support::<FixedLenByteArrayType>(
2064 WriterVersion::PARQUET_2_0,
2065 true,
2066 &[ByteArray::from(vec![1u8]).into()],
2067 Some(0),
2068 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2069 &[
2070 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2071 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2072 ],
2073 );
2074 check_encoding_write_support::<FixedLenByteArrayType>(
2075 WriterVersion::PARQUET_2_0,
2076 false,
2077 &[ByteArray::from(vec![1u8]).into()],
2078 None,
2079 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2080 &[encoding_stats(
2081 PageType::DATA_PAGE_V2,
2082 Encoding::DELTA_BYTE_ARRAY,
2083 1,
2084 )],
2085 );
2086 }
2087
2088 #[test]
2089 fn test_column_writer_check_metadata() {
2090 let page_writer = get_test_page_writer();
2091 let props = Default::default();
2092 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2093 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2094
2095 let r = writer.close().unwrap();
2096 assert_eq!(r.bytes_written, 20);
2097 assert_eq!(r.rows_written, 4);
2098
2099 let metadata = r.metadata;
2100 assert_eq!(
2101 metadata.encodings().collect::<Vec<_>>(),
2102 vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2103 );
2104 assert_eq!(metadata.num_values(), 4);
2105 assert_eq!(metadata.compressed_size(), 20);
2106 assert_eq!(metadata.uncompressed_size(), 20);
2107 assert_eq!(metadata.data_page_offset(), 0);
2108 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2109 if let Some(stats) = metadata.statistics() {
2110 assert_eq!(stats.null_count_opt(), Some(0));
2111 assert_eq!(stats.distinct_count_opt(), None);
2112 if let Statistics::Int32(stats) = stats {
2113 assert_eq!(stats.min_opt().unwrap(), &1);
2114 assert_eq!(stats.max_opt().unwrap(), &4);
2115 } else {
2116 panic!("expecting Statistics::Int32");
2117 }
2118 } else {
2119 panic!("metadata missing statistics");
2120 }
2121 }
2122
2123 #[test]
2124 fn test_column_writer_check_byte_array_min_max() {
2125 let page_writer = get_test_page_writer();
2126 let props = Default::default();
2127 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2128 writer
2129 .write_batch(
2130 &[
2131 ByteArray::from(vec![
2132 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2133 35u8, 231u8, 90u8, 0u8, 0u8,
2134 ]),
2135 ByteArray::from(vec![
2136 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2137 152u8, 177u8, 56u8, 0u8, 0u8,
2138 ]),
2139 ByteArray::from(vec![
2140 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2141 0u8,
2142 ]),
2143 ByteArray::from(vec![
2144 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2145 44u8, 0u8, 0u8,
2146 ]),
2147 ],
2148 None,
2149 None,
2150 )
2151 .unwrap();
2152 let metadata = writer.close().unwrap().metadata;
2153 if let Some(stats) = metadata.statistics() {
2154 if let Statistics::ByteArray(stats) = stats {
2155 assert_eq!(
2156 stats.min_opt().unwrap(),
2157 &ByteArray::from(vec![
2158 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2159 35u8, 231u8, 90u8, 0u8, 0u8,
2160 ])
2161 );
2162 assert_eq!(
2163 stats.max_opt().unwrap(),
2164 &ByteArray::from(vec![
2165 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2166 44u8, 0u8, 0u8,
2167 ])
2168 );
2169 } else {
2170 panic!("expecting Statistics::ByteArray");
2171 }
2172 } else {
2173 panic!("metadata missing statistics");
2174 }
2175 }
2176
2177 #[test]
2178 fn test_column_writer_uint32_converted_type_min_max() {
2179 let page_writer = get_test_page_writer();
2180 let props = Default::default();
2181 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2182 page_writer,
2183 0,
2184 0,
2185 props,
2186 );
2187 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2188 let metadata = writer.close().unwrap().metadata;
2189 if let Some(stats) = metadata.statistics() {
2190 if let Statistics::Int32(stats) = stats {
2191 assert_eq!(stats.min_opt().unwrap(), &0,);
2192 assert_eq!(stats.max_opt().unwrap(), &5,);
2193 } else {
2194 panic!("expecting Statistics::Int32");
2195 }
2196 } else {
2197 panic!("metadata missing statistics");
2198 }
2199 }
2200
2201 #[test]
2202 fn test_column_writer_precalculated_statistics() {
2203 let page_writer = get_test_page_writer();
2204 let props = Arc::new(
2205 WriterProperties::builder()
2206 .set_statistics_enabled(EnabledStatistics::Chunk)
2207 .build(),
2208 );
2209 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2210 writer
2211 .write_batch_with_statistics(
2212 &[1, 2, 3, 4],
2213 None,
2214 None,
2215 Some(&-17),
2216 Some(&9000),
2217 Some(55),
2218 )
2219 .unwrap();
2220
2221 let r = writer.close().unwrap();
2222 assert_eq!(r.bytes_written, 20);
2223 assert_eq!(r.rows_written, 4);
2224
2225 let metadata = r.metadata;
2226 assert_eq!(
2227 metadata.encodings().collect::<Vec<_>>(),
2228 vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2229 );
2230 assert_eq!(metadata.num_values(), 4);
2231 assert_eq!(metadata.compressed_size(), 20);
2232 assert_eq!(metadata.uncompressed_size(), 20);
2233 assert_eq!(metadata.data_page_offset(), 0);
2234 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2235 if let Some(stats) = metadata.statistics() {
2236 assert_eq!(stats.null_count_opt(), Some(0));
2237 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2238 if let Statistics::Int32(stats) = stats {
2239 assert_eq!(stats.min_opt().unwrap(), &-17);
2240 assert_eq!(stats.max_opt().unwrap(), &9000);
2241 } else {
2242 panic!("expecting Statistics::Int32");
2243 }
2244 } else {
2245 panic!("metadata missing statistics");
2246 }
2247 }
2248
2249 #[test]
2250 fn test_mixed_precomputed_statistics() {
2251 let mut buf = Vec::with_capacity(100);
2252 let mut write = TrackedWrite::new(&mut buf);
2253 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2254 let props = Arc::new(
2255 WriterProperties::builder()
2256 .set_write_page_header_statistics(true)
2257 .build(),
2258 );
2259 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2260
2261 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2262 writer
2263 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2264 .unwrap();
2265
2266 let r = writer.close().unwrap();
2267
2268 let stats = r.metadata.statistics().unwrap();
2269 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2270 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2271 assert_eq!(stats.null_count_opt(), Some(0));
2272 assert!(stats.distinct_count_opt().is_none());
2273
2274 drop(write);
2275
2276 let props = ReaderProperties::builder()
2277 .set_backward_compatible_lz4(false)
2278 .set_read_page_statistics(true)
2279 .build();
2280 let reader = SerializedPageReader::new_with_properties(
2281 Arc::new(Bytes::from(buf)),
2282 &r.metadata,
2283 r.rows_written as usize,
2284 None,
2285 Arc::new(props),
2286 )
2287 .unwrap();
2288
2289 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2290 assert_eq!(pages.len(), 2);
2291
2292 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2293 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2294
2295 let page_statistics = pages[1].statistics().unwrap();
2296 assert_eq!(
2297 page_statistics.min_bytes_opt().unwrap(),
2298 1_i32.to_le_bytes()
2299 );
2300 assert_eq!(
2301 page_statistics.max_bytes_opt().unwrap(),
2302 7_i32.to_le_bytes()
2303 );
2304 assert_eq!(page_statistics.null_count_opt(), Some(0));
2305 assert!(page_statistics.distinct_count_opt().is_none());
2306 }
2307
2308 #[test]
2309 fn test_disabled_statistics() {
2310 let mut buf = Vec::with_capacity(100);
2311 let mut write = TrackedWrite::new(&mut buf);
2312 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2313 let props = WriterProperties::builder()
2314 .set_statistics_enabled(EnabledStatistics::None)
2315 .set_writer_version(WriterVersion::PARQUET_2_0)
2316 .build();
2317 let props = Arc::new(props);
2318
2319 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2320 writer
2321 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2322 .unwrap();
2323
2324 let r = writer.close().unwrap();
2325 assert!(r.metadata.statistics().is_none());
2326
2327 drop(write);
2328
2329 let props = ReaderProperties::builder()
2330 .set_backward_compatible_lz4(false)
2331 .build();
2332 let reader = SerializedPageReader::new_with_properties(
2333 Arc::new(Bytes::from(buf)),
2334 &r.metadata,
2335 r.rows_written as usize,
2336 None,
2337 Arc::new(props),
2338 )
2339 .unwrap();
2340
2341 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2342 assert_eq!(pages.len(), 2);
2343
2344 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2345 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2346
2347 match &pages[1] {
2348 Page::DataPageV2 {
2349 num_values,
2350 num_nulls,
2351 num_rows,
2352 statistics,
2353 ..
2354 } => {
2355 assert_eq!(*num_values, 6);
2356 assert_eq!(*num_nulls, 2);
2357 assert_eq!(*num_rows, 6);
2358 assert!(statistics.is_none());
2359 }
2360 _ => unreachable!(),
2361 }
2362 }
2363
2364 #[test]
2365 fn test_column_writer_empty_column_roundtrip() {
2366 let props = Default::default();
2367 column_roundtrip::<Int32Type>(props, &[], None, None);
2368 }
2369
2370 #[test]
2371 fn test_column_writer_non_nullable_values_roundtrip() {
2372 let props = Default::default();
2373 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2374 }
2375
2376 #[test]
2377 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2378 let props = Default::default();
2379 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2380 }
2381
2382 #[test]
2383 fn test_column_writer_nullable_repeated_values_roundtrip() {
2384 let props = Default::default();
2385 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2386 }
2387
2388 #[test]
2389 fn test_column_writer_dictionary_fallback_small_data_page() {
2390 let props = WriterProperties::builder()
2391 .set_dictionary_page_size_limit(32)
2392 .set_data_page_size_limit(32)
2393 .build();
2394 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2395 }
2396
2397 #[test]
2398 fn test_column_writer_small_write_batch_size() {
2399 for i in &[1usize, 2, 5, 10, 11, 1023] {
2400 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2401
2402 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2403 }
2404 }
2405
2406 #[test]
2407 fn test_column_writer_dictionary_disabled_v1() {
2408 let props = WriterProperties::builder()
2409 .set_writer_version(WriterVersion::PARQUET_1_0)
2410 .set_dictionary_enabled(false)
2411 .build();
2412 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2413 }
2414
2415 #[test]
2416 fn test_column_writer_dictionary_disabled_v2() {
2417 let props = WriterProperties::builder()
2418 .set_writer_version(WriterVersion::PARQUET_2_0)
2419 .set_dictionary_enabled(false)
2420 .build();
2421 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2422 }
2423
2424 #[test]
2425 fn test_column_writer_compression_v1() {
2426 let props = WriterProperties::builder()
2427 .set_writer_version(WriterVersion::PARQUET_1_0)
2428 .set_compression(Compression::SNAPPY)
2429 .build();
2430 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2431 }
2432
2433 #[test]
2434 fn test_column_writer_compression_v2() {
2435 let props = WriterProperties::builder()
2436 .set_writer_version(WriterVersion::PARQUET_2_0)
2437 .set_compression(Compression::SNAPPY)
2438 .build();
2439 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2440 }
2441
2442 #[test]
2443 fn test_column_writer_add_data_pages_with_dict() {
2444 let mut file = tempfile::tempfile().unwrap();
2447 let mut write = TrackedWrite::new(&mut file);
2448 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2449 let props = Arc::new(
2450 WriterProperties::builder()
2451 .set_data_page_size_limit(10)
2452 .set_write_batch_size(3) .build(),
2454 );
2455 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2456 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2457 writer.write_batch(data, None, None).unwrap();
2458 let r = writer.close().unwrap();
2459
2460 drop(write);
2461
2462 let props = ReaderProperties::builder()
2464 .set_backward_compatible_lz4(false)
2465 .build();
2466 let mut page_reader = Box::new(
2467 SerializedPageReader::new_with_properties(
2468 Arc::new(file),
2469 &r.metadata,
2470 r.rows_written as usize,
2471 None,
2472 Arc::new(props),
2473 )
2474 .unwrap(),
2475 );
2476 let mut res = Vec::new();
2477 while let Some(page) = page_reader.get_next_page().unwrap() {
2478 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2479 }
2480 assert_eq!(
2481 res,
2482 vec![
2483 (PageType::DICTIONARY_PAGE, 10, 40),
2484 (PageType::DATA_PAGE, 9, 10),
2485 (PageType::DATA_PAGE, 1, 3),
2486 ]
2487 );
2488 assert_eq!(
2489 r.metadata.page_encoding_stats(),
2490 Some(&vec![
2491 PageEncodingStats {
2492 page_type: PageType::DICTIONARY_PAGE,
2493 encoding: Encoding::PLAIN,
2494 count: 1
2495 },
2496 PageEncodingStats {
2497 page_type: PageType::DATA_PAGE,
2498 encoding: Encoding::RLE_DICTIONARY,
2499 count: 2,
2500 }
2501 ])
2502 );
2503 }
2504
2505 #[test]
2506 fn test_bool_statistics() {
2507 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2508 assert!(!stats.is_min_max_backwards_compatible());
2511 if let Statistics::Boolean(stats) = stats {
2512 assert_eq!(stats.min_opt().unwrap(), &false);
2513 assert_eq!(stats.max_opt().unwrap(), &true);
2514 } else {
2515 panic!("expecting Statistics::Boolean, got {stats:?}");
2516 }
2517 }
2518
2519 #[test]
2520 fn test_int32_statistics() {
2521 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2522 assert!(stats.is_min_max_backwards_compatible());
2523 if let Statistics::Int32(stats) = stats {
2524 assert_eq!(stats.min_opt().unwrap(), &-2);
2525 assert_eq!(stats.max_opt().unwrap(), &3);
2526 } else {
2527 panic!("expecting Statistics::Int32, got {stats:?}");
2528 }
2529 }
2530
2531 #[test]
2532 fn test_int64_statistics() {
2533 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2534 assert!(stats.is_min_max_backwards_compatible());
2535 if let Statistics::Int64(stats) = stats {
2536 assert_eq!(stats.min_opt().unwrap(), &-2);
2537 assert_eq!(stats.max_opt().unwrap(), &3);
2538 } else {
2539 panic!("expecting Statistics::Int64, got {stats:?}");
2540 }
2541 }
2542
2543 #[test]
2544 fn test_int96_statistics() {
2545 let input = vec![
2546 Int96::from(vec![1, 20, 30]),
2547 Int96::from(vec![3, 20, 10]),
2548 Int96::from(vec![0, 20, 30]),
2549 Int96::from(vec![2, 20, 30]),
2550 ]
2551 .into_iter()
2552 .collect::<Vec<Int96>>();
2553
2554 let stats = statistics_roundtrip::<Int96Type>(&input);
2555 assert!(!stats.is_min_max_backwards_compatible());
2556 if let Statistics::Int96(stats) = stats {
2557 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2558 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
2559 } else {
2560 panic!("expecting Statistics::Int96, got {stats:?}");
2561 }
2562 }
2563
2564 #[test]
2565 fn test_float_statistics() {
2566 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2567 assert!(stats.is_min_max_backwards_compatible());
2568 if let Statistics::Float(stats) = stats {
2569 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2570 assert_eq!(stats.max_opt().unwrap(), &3.0);
2571 } else {
2572 panic!("expecting Statistics::Float, got {stats:?}");
2573 }
2574 }
2575
2576 #[test]
2577 fn test_double_statistics() {
2578 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2579 assert!(stats.is_min_max_backwards_compatible());
2580 if let Statistics::Double(stats) = stats {
2581 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2582 assert_eq!(stats.max_opt().unwrap(), &3.0);
2583 } else {
2584 panic!("expecting Statistics::Double, got {stats:?}");
2585 }
2586 }
2587
2588 #[test]
2589 fn test_byte_array_statistics() {
2590 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2591 .iter()
2592 .map(|&s| s.into())
2593 .collect::<Vec<_>>();
2594
2595 let stats = statistics_roundtrip::<ByteArrayType>(&input);
2596 assert!(!stats.is_min_max_backwards_compatible());
2597 if let Statistics::ByteArray(stats) = stats {
2598 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2599 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2600 } else {
2601 panic!("expecting Statistics::ByteArray, got {stats:?}");
2602 }
2603 }
2604
2605 #[test]
2606 fn test_fixed_len_byte_array_statistics() {
2607 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
2608 .iter()
2609 .map(|&s| ByteArray::from(s).into())
2610 .collect::<Vec<_>>();
2611
2612 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2613 assert!(!stats.is_min_max_backwards_compatible());
2614 if let Statistics::FixedLenByteArray(stats) = stats {
2615 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
2616 assert_eq!(stats.min_opt().unwrap(), &expected_min);
2617 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
2618 assert_eq!(stats.max_opt().unwrap(), &expected_max);
2619 } else {
2620 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2621 }
2622 }
2623
2624 #[test]
2625 fn test_column_writer_check_float16_min_max() {
2626 let input = [
2627 -f16::ONE,
2628 f16::from_f32(3.0),
2629 -f16::from_f32(2.0),
2630 f16::from_f32(2.0),
2631 ]
2632 .into_iter()
2633 .map(|s| ByteArray::from(s).into())
2634 .collect::<Vec<_>>();
2635
2636 let stats = float16_statistics_roundtrip(&input);
2637 assert!(stats.is_min_max_backwards_compatible());
2638 assert_eq!(
2639 stats.min_opt().unwrap(),
2640 &ByteArray::from(-f16::from_f32(2.0))
2641 );
2642 assert_eq!(
2643 stats.max_opt().unwrap(),
2644 &ByteArray::from(f16::from_f32(3.0))
2645 );
2646 }
2647
2648 #[test]
2649 fn test_column_writer_check_float16_nan_middle() {
2650 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2651 .into_iter()
2652 .map(|s| ByteArray::from(s).into())
2653 .collect::<Vec<_>>();
2654
2655 let stats = float16_statistics_roundtrip(&input);
2656 assert!(stats.is_min_max_backwards_compatible());
2657 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2658 assert_eq!(
2659 stats.max_opt().unwrap(),
2660 &ByteArray::from(f16::ONE + f16::ONE)
2661 );
2662 }
2663
2664 #[test]
2665 fn test_float16_statistics_nan_middle() {
2666 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2667 .into_iter()
2668 .map(|s| ByteArray::from(s).into())
2669 .collect::<Vec<_>>();
2670
2671 let stats = float16_statistics_roundtrip(&input);
2672 assert!(stats.is_min_max_backwards_compatible());
2673 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2674 assert_eq!(
2675 stats.max_opt().unwrap(),
2676 &ByteArray::from(f16::ONE + f16::ONE)
2677 );
2678 }
2679
2680 #[test]
2681 fn test_float16_statistics_nan_start() {
2682 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2683 .into_iter()
2684 .map(|s| ByteArray::from(s).into())
2685 .collect::<Vec<_>>();
2686
2687 let stats = float16_statistics_roundtrip(&input);
2688 assert!(stats.is_min_max_backwards_compatible());
2689 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2690 assert_eq!(
2691 stats.max_opt().unwrap(),
2692 &ByteArray::from(f16::ONE + f16::ONE)
2693 );
2694 }
2695
2696 #[test]
2697 fn test_float16_statistics_nan_only() {
2698 let input = [f16::NAN, f16::NAN]
2699 .into_iter()
2700 .map(|s| ByteArray::from(s).into())
2701 .collect::<Vec<_>>();
2702
2703 let stats = float16_statistics_roundtrip(&input);
2704 assert!(stats.min_bytes_opt().is_none());
2705 assert!(stats.max_bytes_opt().is_none());
2706 assert!(stats.is_min_max_backwards_compatible());
2707 }
2708
2709 #[test]
2710 fn test_float16_statistics_zero_only() {
2711 let input = [f16::ZERO]
2712 .into_iter()
2713 .map(|s| ByteArray::from(s).into())
2714 .collect::<Vec<_>>();
2715
2716 let stats = float16_statistics_roundtrip(&input);
2717 assert!(stats.is_min_max_backwards_compatible());
2718 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2719 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2720 }
2721
2722 #[test]
2723 fn test_float16_statistics_neg_zero_only() {
2724 let input = [f16::NEG_ZERO]
2725 .into_iter()
2726 .map(|s| ByteArray::from(s).into())
2727 .collect::<Vec<_>>();
2728
2729 let stats = float16_statistics_roundtrip(&input);
2730 assert!(stats.is_min_max_backwards_compatible());
2731 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2732 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2733 }
2734
2735 #[test]
2736 fn test_float16_statistics_zero_min() {
2737 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2738 .into_iter()
2739 .map(|s| ByteArray::from(s).into())
2740 .collect::<Vec<_>>();
2741
2742 let stats = float16_statistics_roundtrip(&input);
2743 assert!(stats.is_min_max_backwards_compatible());
2744 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2745 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2746 }
2747
2748 #[test]
2749 fn test_float16_statistics_neg_zero_max() {
2750 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2751 .into_iter()
2752 .map(|s| ByteArray::from(s).into())
2753 .collect::<Vec<_>>();
2754
2755 let stats = float16_statistics_roundtrip(&input);
2756 assert!(stats.is_min_max_backwards_compatible());
2757 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2758 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2759 }
2760
2761 #[test]
2762 fn test_float_statistics_nan_middle() {
2763 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2764 assert!(stats.is_min_max_backwards_compatible());
2765 if let Statistics::Float(stats) = stats {
2766 assert_eq!(stats.min_opt().unwrap(), &1.0);
2767 assert_eq!(stats.max_opt().unwrap(), &2.0);
2768 } else {
2769 panic!("expecting Statistics::Float");
2770 }
2771 }
2772
2773 #[test]
2774 fn test_float_statistics_nan_start() {
2775 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2776 assert!(stats.is_min_max_backwards_compatible());
2777 if let Statistics::Float(stats) = stats {
2778 assert_eq!(stats.min_opt().unwrap(), &1.0);
2779 assert_eq!(stats.max_opt().unwrap(), &2.0);
2780 } else {
2781 panic!("expecting Statistics::Float");
2782 }
2783 }
2784
2785 #[test]
2786 fn test_float_statistics_nan_only() {
2787 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2788 assert!(stats.min_bytes_opt().is_none());
2789 assert!(stats.max_bytes_opt().is_none());
2790 assert!(stats.is_min_max_backwards_compatible());
2791 assert!(matches!(stats, Statistics::Float(_)));
2792 }
2793
2794 #[test]
2795 fn test_float_statistics_zero_only() {
2796 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2797 assert!(stats.is_min_max_backwards_compatible());
2798 if let Statistics::Float(stats) = stats {
2799 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2800 assert!(stats.min_opt().unwrap().is_sign_negative());
2801 assert_eq!(stats.max_opt().unwrap(), &0.0);
2802 assert!(stats.max_opt().unwrap().is_sign_positive());
2803 } else {
2804 panic!("expecting Statistics::Float");
2805 }
2806 }
2807
2808 #[test]
2809 fn test_float_statistics_neg_zero_only() {
2810 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2811 assert!(stats.is_min_max_backwards_compatible());
2812 if let Statistics::Float(stats) = stats {
2813 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2814 assert!(stats.min_opt().unwrap().is_sign_negative());
2815 assert_eq!(stats.max_opt().unwrap(), &0.0);
2816 assert!(stats.max_opt().unwrap().is_sign_positive());
2817 } else {
2818 panic!("expecting Statistics::Float");
2819 }
2820 }
2821
2822 #[test]
2823 fn test_float_statistics_zero_min() {
2824 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2825 assert!(stats.is_min_max_backwards_compatible());
2826 if let Statistics::Float(stats) = stats {
2827 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2828 assert!(stats.min_opt().unwrap().is_sign_negative());
2829 assert_eq!(stats.max_opt().unwrap(), &2.0);
2830 } else {
2831 panic!("expecting Statistics::Float");
2832 }
2833 }
2834
2835 #[test]
2836 fn test_float_statistics_neg_zero_max() {
2837 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2838 assert!(stats.is_min_max_backwards_compatible());
2839 if let Statistics::Float(stats) = stats {
2840 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2841 assert_eq!(stats.max_opt().unwrap(), &0.0);
2842 assert!(stats.max_opt().unwrap().is_sign_positive());
2843 } else {
2844 panic!("expecting Statistics::Float");
2845 }
2846 }
2847
2848 #[test]
2849 fn test_double_statistics_nan_middle() {
2850 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2851 assert!(stats.is_min_max_backwards_compatible());
2852 if let Statistics::Double(stats) = stats {
2853 assert_eq!(stats.min_opt().unwrap(), &1.0);
2854 assert_eq!(stats.max_opt().unwrap(), &2.0);
2855 } else {
2856 panic!("expecting Statistics::Double");
2857 }
2858 }
2859
2860 #[test]
2861 fn test_double_statistics_nan_start() {
2862 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2863 assert!(stats.is_min_max_backwards_compatible());
2864 if let Statistics::Double(stats) = stats {
2865 assert_eq!(stats.min_opt().unwrap(), &1.0);
2866 assert_eq!(stats.max_opt().unwrap(), &2.0);
2867 } else {
2868 panic!("expecting Statistics::Double");
2869 }
2870 }
2871
2872 #[test]
2873 fn test_double_statistics_nan_only() {
2874 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2875 assert!(stats.min_bytes_opt().is_none());
2876 assert!(stats.max_bytes_opt().is_none());
2877 assert!(matches!(stats, Statistics::Double(_)));
2878 assert!(stats.is_min_max_backwards_compatible());
2879 }
2880
2881 #[test]
2882 fn test_double_statistics_zero_only() {
2883 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2884 assert!(stats.is_min_max_backwards_compatible());
2885 if let Statistics::Double(stats) = stats {
2886 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2887 assert!(stats.min_opt().unwrap().is_sign_negative());
2888 assert_eq!(stats.max_opt().unwrap(), &0.0);
2889 assert!(stats.max_opt().unwrap().is_sign_positive());
2890 } else {
2891 panic!("expecting Statistics::Double");
2892 }
2893 }
2894
2895 #[test]
2896 fn test_double_statistics_neg_zero_only() {
2897 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2898 assert!(stats.is_min_max_backwards_compatible());
2899 if let Statistics::Double(stats) = stats {
2900 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2901 assert!(stats.min_opt().unwrap().is_sign_negative());
2902 assert_eq!(stats.max_opt().unwrap(), &0.0);
2903 assert!(stats.max_opt().unwrap().is_sign_positive());
2904 } else {
2905 panic!("expecting Statistics::Double");
2906 }
2907 }
2908
2909 #[test]
2910 fn test_double_statistics_zero_min() {
2911 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2912 assert!(stats.is_min_max_backwards_compatible());
2913 if let Statistics::Double(stats) = stats {
2914 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2915 assert!(stats.min_opt().unwrap().is_sign_negative());
2916 assert_eq!(stats.max_opt().unwrap(), &2.0);
2917 } else {
2918 panic!("expecting Statistics::Double");
2919 }
2920 }
2921
2922 #[test]
2923 fn test_double_statistics_neg_zero_max() {
2924 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2925 assert!(stats.is_min_max_backwards_compatible());
2926 if let Statistics::Double(stats) = stats {
2927 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2928 assert_eq!(stats.max_opt().unwrap(), &0.0);
2929 assert!(stats.max_opt().unwrap().is_sign_positive());
2930 } else {
2931 panic!("expecting Statistics::Double");
2932 }
2933 }
2934
2935 #[test]
2936 fn test_compare_greater_byte_array_decimals() {
2937 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2938 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2939 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2940 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2941 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2942 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2943 assert!(!compare_greater_byte_array_decimals(
2944 &[0u8, 1u8,],
2945 &[1u8, 0u8,],
2946 ),);
2947 assert!(!compare_greater_byte_array_decimals(
2948 &[255u8, 35u8, 0u8, 0u8,],
2949 &[0u8,],
2950 ),);
2951 assert!(compare_greater_byte_array_decimals(
2952 &[0u8,],
2953 &[255u8, 35u8, 0u8, 0u8,],
2954 ),);
2955 }
2956
2957 #[test]
2958 fn test_column_index_with_null_pages() {
2959 let page_writer = get_test_page_writer();
2961 let props = Default::default();
2962 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2963 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2964
2965 let r = writer.close().unwrap();
2966 assert!(r.column_index.is_some());
2967 let col_idx = r.column_index.unwrap();
2968 let col_idx = match col_idx {
2969 ColumnIndexMetaData::INT32(col_idx) => col_idx,
2970 _ => panic!("wrong stats type"),
2971 };
2972 assert!(col_idx.is_null_page(0));
2974 assert!(col_idx.min_value(0).is_none());
2976 assert!(col_idx.max_value(0).is_none());
2977 assert!(col_idx.null_count(0).is_some());
2979 assert_eq!(col_idx.null_count(0), Some(4));
2980 assert!(col_idx.repetition_level_histogram(0).is_none());
2982 assert!(col_idx.definition_level_histogram(0).is_some());
2984 assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
2985 }
2986
2987 #[test]
2988 fn test_column_offset_index_metadata() {
2989 let page_writer = get_test_page_writer();
2992 let props = Default::default();
2993 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2994 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2995 writer.flush_data_pages().unwrap();
2997 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2999
3000 let r = writer.close().unwrap();
3001 let column_index = r.column_index.unwrap();
3002 let offset_index = r.offset_index.unwrap();
3003
3004 assert_eq!(8, r.rows_written);
3005
3006 let column_index = match column_index {
3008 ColumnIndexMetaData::INT32(column_index) => column_index,
3009 _ => panic!("wrong stats type"),
3010 };
3011 assert_eq!(2, column_index.num_pages());
3012 assert_eq!(2, offset_index.page_locations.len());
3013 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
3014 for idx in 0..2 {
3015 assert!(!column_index.is_null_page(idx));
3016 assert_eq!(0, column_index.null_count(0).unwrap());
3017 }
3018
3019 if let Some(stats) = r.metadata.statistics() {
3020 assert_eq!(stats.null_count_opt(), Some(0));
3021 assert_eq!(stats.distinct_count_opt(), None);
3022 if let Statistics::Int32(stats) = stats {
3023 assert_eq!(stats.min_opt(), column_index.min_value(1));
3027 assert_eq!(stats.max_opt(), column_index.max_value(1));
3028 } else {
3029 panic!("expecting Statistics::Int32");
3030 }
3031 } else {
3032 panic!("metadata missing statistics");
3033 }
3034
3035 assert_eq!(0, offset_index.page_locations[0].first_row_index);
3037 assert_eq!(4, offset_index.page_locations[1].first_row_index);
3038 }
3039
3040 #[test]
3042 fn test_column_offset_index_metadata_truncating() {
3043 let page_writer = get_test_page_writer();
3046 let props = WriterProperties::builder()
3047 .set_statistics_truncate_length(None) .build()
3049 .into();
3050 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3051
3052 let mut data = vec![FixedLenByteArray::default(); 3];
3053 data[0].set_data(Bytes::from(vec![97_u8; 200]));
3055 data[1].set_data(Bytes::from(vec![112_u8; 200]));
3057 data[2].set_data(Bytes::from(vec![98_u8; 200]));
3058
3059 writer.write_batch(&data, None, None).unwrap();
3060
3061 writer.flush_data_pages().unwrap();
3062
3063 let r = writer.close().unwrap();
3064 let column_index = r.column_index.unwrap();
3065 let offset_index = r.offset_index.unwrap();
3066
3067 let column_index = match column_index {
3068 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3069 _ => panic!("wrong stats type"),
3070 };
3071
3072 assert_eq!(3, r.rows_written);
3073
3074 assert_eq!(1, column_index.num_pages());
3076 assert_eq!(1, offset_index.page_locations.len());
3077 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3078 assert!(!column_index.is_null_page(0));
3079 assert_eq!(Some(0), column_index.null_count(0));
3080
3081 if let Some(stats) = r.metadata.statistics() {
3082 assert_eq!(stats.null_count_opt(), Some(0));
3083 assert_eq!(stats.distinct_count_opt(), None);
3084 if let Statistics::FixedLenByteArray(stats) = stats {
3085 let column_index_min_value = column_index.min_value(0).unwrap();
3086 let column_index_max_value = column_index.max_value(0).unwrap();
3087
3088 assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value);
3090 assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value);
3091
3092 assert_eq!(
3093 column_index_min_value.len(),
3094 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3095 );
3096 assert_eq!(column_index_min_value, &[97_u8; 64]);
3097 assert_eq!(
3098 column_index_max_value.len(),
3099 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3100 );
3101
3102 assert_eq!(
3104 *column_index_max_value.last().unwrap(),
3105 *column_index_max_value.first().unwrap() + 1
3106 );
3107 } else {
3108 panic!("expecting Statistics::FixedLenByteArray");
3109 }
3110 } else {
3111 panic!("metadata missing statistics");
3112 }
3113 }
3114
3115 #[test]
3116 fn test_column_offset_index_truncating_spec_example() {
3117 let page_writer = get_test_page_writer();
3120
3121 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3123 let props = Arc::new(builder.build());
3124 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3125
3126 let mut data = vec![FixedLenByteArray::default(); 1];
3127 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3129
3130 writer.write_batch(&data, None, None).unwrap();
3131
3132 writer.flush_data_pages().unwrap();
3133
3134 let r = writer.close().unwrap();
3135 let column_index = r.column_index.unwrap();
3136 let offset_index = r.offset_index.unwrap();
3137
3138 let column_index = match column_index {
3139 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3140 _ => panic!("wrong stats type"),
3141 };
3142
3143 assert_eq!(1, r.rows_written);
3144
3145 assert_eq!(1, column_index.num_pages());
3147 assert_eq!(1, offset_index.page_locations.len());
3148 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3149 assert!(!column_index.is_null_page(0));
3150 assert_eq!(Some(0), column_index.null_count(0));
3151
3152 if let Some(stats) = r.metadata.statistics() {
3153 assert_eq!(stats.null_count_opt(), Some(0));
3154 assert_eq!(stats.distinct_count_opt(), None);
3155 if let Statistics::FixedLenByteArray(_stats) = stats {
3156 let column_index_min_value = column_index.min_value(0).unwrap();
3157 let column_index_max_value = column_index.max_value(0).unwrap();
3158
3159 assert_eq!(column_index_min_value.len(), 1);
3160 assert_eq!(column_index_max_value.len(), 1);
3161
3162 assert_eq!("B".as_bytes(), column_index_min_value);
3163 assert_eq!("C".as_bytes(), column_index_max_value);
3164
3165 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3166 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3167 } else {
3168 panic!("expecting Statistics::FixedLenByteArray");
3169 }
3170 } else {
3171 panic!("metadata missing statistics");
3172 }
3173 }
3174
3175 #[test]
3176 fn test_float16_min_max_no_truncation() {
3177 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3179 let props = Arc::new(builder.build());
3180 let page_writer = get_test_page_writer();
3181 let mut writer = get_test_float16_column_writer(page_writer, props);
3182
3183 let expected_value = f16::PI.to_le_bytes().to_vec();
3184 let data = vec![ByteArray::from(expected_value.clone()).into()];
3185 writer.write_batch(&data, None, None).unwrap();
3186 writer.flush_data_pages().unwrap();
3187
3188 let r = writer.close().unwrap();
3189
3190 let column_index = r.column_index.unwrap();
3193 let column_index = match column_index {
3194 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3195 _ => panic!("wrong stats type"),
3196 };
3197 let column_index_min_bytes = column_index.min_value(0).unwrap();
3198 let column_index_max_bytes = column_index.max_value(0).unwrap();
3199 assert_eq!(expected_value, column_index_min_bytes);
3200 assert_eq!(expected_value, column_index_max_bytes);
3201
3202 let stats = r.metadata.statistics().unwrap();
3204 if let Statistics::FixedLenByteArray(stats) = stats {
3205 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3206 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3207 assert_eq!(expected_value, stats_min_bytes);
3208 assert_eq!(expected_value, stats_max_bytes);
3209 } else {
3210 panic!("expecting Statistics::FixedLenByteArray");
3211 }
3212 }
3213
3214 #[test]
3215 fn test_decimal_min_max_no_truncation() {
3216 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3218 let props = Arc::new(builder.build());
3219 let page_writer = get_test_page_writer();
3220 let mut writer =
3221 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3222
3223 let expected_value = vec![
3224 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3225 231u8, 90u8, 0u8, 0u8,
3226 ];
3227 let data = vec![ByteArray::from(expected_value.clone()).into()];
3228 writer.write_batch(&data, None, None).unwrap();
3229 writer.flush_data_pages().unwrap();
3230
3231 let r = writer.close().unwrap();
3232
3233 let column_index = r.column_index.unwrap();
3236 let column_index = match column_index {
3237 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3238 _ => panic!("wrong stats type"),
3239 };
3240 let column_index_min_bytes = column_index.min_value(0).unwrap();
3241 let column_index_max_bytes = column_index.max_value(0).unwrap();
3242 assert_eq!(expected_value, column_index_min_bytes);
3243 assert_eq!(expected_value, column_index_max_bytes);
3244
3245 let stats = r.metadata.statistics().unwrap();
3247 if let Statistics::FixedLenByteArray(stats) = stats {
3248 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3249 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3250 assert_eq!(expected_value, stats_min_bytes);
3251 assert_eq!(expected_value, stats_max_bytes);
3252 } else {
3253 panic!("expecting Statistics::FixedLenByteArray");
3254 }
3255 }
3256
3257 #[test]
3258 fn test_statistics_truncating_byte_array_default() {
3259 let page_writer = get_test_page_writer();
3260
3261 let props = WriterProperties::builder().build().into();
3263 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3264
3265 let mut data = vec![ByteArray::default(); 1];
3266 data[0].set_data(Bytes::from(String::from(
3267 "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3268 )));
3269 writer.write_batch(&data, None, None).unwrap();
3270 writer.flush_data_pages().unwrap();
3271
3272 let r = writer.close().unwrap();
3273
3274 assert_eq!(1, r.rows_written);
3275
3276 let stats = r.metadata.statistics().expect("statistics");
3277 if let Statistics::ByteArray(_stats) = stats {
3278 let min_value = _stats.min_opt().unwrap();
3279 let max_value = _stats.max_opt().unwrap();
3280
3281 assert!(!_stats.min_is_exact());
3282 assert!(!_stats.max_is_exact());
3283
3284 let expected_len = 64;
3285 assert_eq!(min_value.len(), expected_len);
3286 assert_eq!(max_value.len(), expected_len);
3287
3288 let expected_min =
3289 "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3290 assert_eq!(expected_min, min_value.as_bytes());
3291 let expected_max =
3293 "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3294 assert_eq!(expected_max, max_value.as_bytes());
3295 } else {
3296 panic!("expecting Statistics::ByteArray");
3297 }
3298 }
3299
3300 #[test]
3301 fn test_statistics_truncating_byte_array() {
3302 let page_writer = get_test_page_writer();
3303
3304 const TEST_TRUNCATE_LENGTH: usize = 1;
3305
3306 let builder =
3308 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3309 let props = Arc::new(builder.build());
3310 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3311
3312 let mut data = vec![ByteArray::default(); 1];
3313 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3315
3316 writer.write_batch(&data, None, None).unwrap();
3317
3318 writer.flush_data_pages().unwrap();
3319
3320 let r = writer.close().unwrap();
3321
3322 assert_eq!(1, r.rows_written);
3323
3324 let stats = r.metadata.statistics().expect("statistics");
3325 assert_eq!(stats.null_count_opt(), Some(0));
3326 assert_eq!(stats.distinct_count_opt(), None);
3327 if let Statistics::ByteArray(_stats) = stats {
3328 let min_value = _stats.min_opt().unwrap();
3329 let max_value = _stats.max_opt().unwrap();
3330
3331 assert!(!_stats.min_is_exact());
3332 assert!(!_stats.max_is_exact());
3333
3334 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3335 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3336
3337 assert_eq!("B".as_bytes(), min_value.as_bytes());
3338 assert_eq!("C".as_bytes(), max_value.as_bytes());
3339 } else {
3340 panic!("expecting Statistics::ByteArray");
3341 }
3342 }
3343
3344 #[test]
3345 fn test_statistics_truncating_fixed_len_byte_array() {
3346 let page_writer = get_test_page_writer();
3347
3348 const TEST_TRUNCATE_LENGTH: usize = 1;
3349
3350 let builder =
3352 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3353 let props = Arc::new(builder.build());
3354 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3355
3356 let mut data = vec![FixedLenByteArray::default(); 1];
3357
3358 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3359 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3360
3361 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3363 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3364
3365 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3367
3368 writer.write_batch(&data, None, None).unwrap();
3369
3370 writer.flush_data_pages().unwrap();
3371
3372 let r = writer.close().unwrap();
3373
3374 assert_eq!(1, r.rows_written);
3375
3376 let stats = r.metadata.statistics().expect("statistics");
3377 assert_eq!(stats.null_count_opt(), Some(0));
3378 assert_eq!(stats.distinct_count_opt(), None);
3379 if let Statistics::FixedLenByteArray(_stats) = stats {
3380 let min_value = _stats.min_opt().unwrap();
3381 let max_value = _stats.max_opt().unwrap();
3382
3383 assert!(!_stats.min_is_exact());
3384 assert!(!_stats.max_is_exact());
3385
3386 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3387 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3388
3389 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3390 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3391
3392 let reconstructed_min = i128::from_be_bytes([
3393 min_value.as_bytes()[0],
3394 0,
3395 0,
3396 0,
3397 0,
3398 0,
3399 0,
3400 0,
3401 0,
3402 0,
3403 0,
3404 0,
3405 0,
3406 0,
3407 0,
3408 0,
3409 ]);
3410
3411 let reconstructed_max = i128::from_be_bytes([
3412 max_value.as_bytes()[0],
3413 0,
3414 0,
3415 0,
3416 0,
3417 0,
3418 0,
3419 0,
3420 0,
3421 0,
3422 0,
3423 0,
3424 0,
3425 0,
3426 0,
3427 0,
3428 ]);
3429
3430 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3432 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3433 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3434 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3435 } else {
3436 panic!("expecting Statistics::FixedLenByteArray");
3437 }
3438 }
3439
3440 #[test]
3441 fn test_send() {
3442 fn test<T: Send>() {}
3443 test::<ColumnWriterImpl<Int32Type>>();
3444 }
3445
3446 #[test]
3447 fn test_increment() {
3448 let v = increment(vec![0, 0, 0]).unwrap();
3449 assert_eq!(&v, &[0, 0, 1]);
3450
3451 let v = increment(vec![0, 255, 255]).unwrap();
3453 assert_eq!(&v, &[1, 0, 0]);
3454
3455 let v = increment(vec![255, 255, 255]);
3457 assert!(v.is_none());
3458 }
3459
3460 #[test]
3461 fn test_increment_utf8() {
3462 let test_inc = |o: &str, expected: &str| {
3463 if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3464 assert_eq!(v, expected);
3466 assert!(*v > *o);
3468 let mut greater = ByteArray::new();
3470 greater.set_data(Bytes::from(v));
3471 let mut original = ByteArray::new();
3472 original.set_data(Bytes::from(o.as_bytes().to_vec()));
3473 assert!(greater > original);
3474 } else {
3475 panic!("Expected incremented UTF8 string to also be valid.");
3476 }
3477 };
3478
3479 test_inc("hello", "hellp");
3481
3482 test_inc("a\u{7f}", "b");
3484
3485 assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3487
3488 test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3490
3491 test_inc("éééé", "éééê");
3493
3494 test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3496
3497 test_inc("a\u{7ff}", "b");
3499
3500 assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3502
3503 test_inc("ࠀࠀ", "ࠀࠁ");
3506
3507 test_inc("a\u{ffff}", "b");
3509
3510 assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3512
3513 test_inc("𐀀𐀀", "𐀀𐀁");
3515
3516 test_inc("a\u{10ffff}", "b");
3518
3519 assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3521
3522 test_inc("a\u{D7FF}", "b");
3525 }
3526
3527 #[test]
3528 fn test_truncate_utf8() {
3529 let data = "❤️🧡💛💚💙💜";
3531 let r = truncate_utf8(data, data.len()).unwrap();
3532 assert_eq!(r.len(), data.len());
3533 assert_eq!(&r, data.as_bytes());
3534
3535 let r = truncate_utf8(data, 13).unwrap();
3537 assert_eq!(r.len(), 10);
3538 assert_eq!(&r, "❤️🧡".as_bytes());
3539
3540 let r = truncate_utf8("\u{0836}", 1);
3542 assert!(r.is_none());
3543
3544 let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3547 assert_eq!(&r, "yyyyyyyz".as_bytes());
3548
3549 let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3551 assert_eq!(&r, "ééê".as_bytes());
3552
3553 let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3555 assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3556
3557 let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3559 assert!(r.is_none());
3560
3561 let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3564 assert_eq!(&r, "ࠀࠁ".as_bytes());
3565
3566 let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3568 assert!(r.is_none());
3569
3570 let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3572 assert_eq!(&r, "𐀀𐀁".as_bytes());
3573
3574 let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3576 assert!(r.is_none());
3577 }
3578
3579 #[test]
3580 fn test_byte_array_truncate_invalid_utf8_statistics() {
3583 let message_type = "
3584 message test_schema {
3585 OPTIONAL BYTE_ARRAY a (UTF8);
3586 }
3587 ";
3588 let schema = Arc::new(parse_message_type(message_type).unwrap());
3589
3590 let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3592 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3593 let file: File = tempfile::tempfile().unwrap();
3594 let props = Arc::new(
3595 WriterProperties::builder()
3596 .set_statistics_enabled(EnabledStatistics::Chunk)
3597 .set_statistics_truncate_length(Some(8))
3598 .build(),
3599 );
3600
3601 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3602 let mut row_group_writer = writer.next_row_group().unwrap();
3603
3604 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3605 col_writer
3606 .typed::<ByteArrayType>()
3607 .write_batch(&data, Some(&def_levels), None)
3608 .unwrap();
3609 col_writer.close().unwrap();
3610 row_group_writer.close().unwrap();
3611 let file_metadata = writer.close().unwrap();
3612 let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
3613 assert!(!stats.max_is_exact());
3614 assert_eq!(
3617 stats.max_bytes_opt().map(|v| v.to_vec()),
3618 Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3619 );
3620 }
3621
3622 #[test]
3623 fn test_increment_max_binary_chars() {
3624 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3625 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3626
3627 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3628 assert!(incremented.is_none())
3629 }
3630
3631 #[test]
3632 fn test_no_column_index_when_stats_disabled() {
3633 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3637 let props = Arc::new(
3638 WriterProperties::builder()
3639 .set_statistics_enabled(EnabledStatistics::None)
3640 .build(),
3641 );
3642 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3643 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3644
3645 let data = Vec::new();
3646 let def_levels = vec![0; 10];
3647 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3648 writer.flush_data_pages().unwrap();
3649
3650 let column_close_result = writer.close().unwrap();
3651 assert!(column_close_result.offset_index.is_some());
3652 assert!(column_close_result.column_index.is_none());
3653 }
3654
3655 #[test]
3656 fn test_no_offset_index_when_disabled() {
3657 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3659 let props = Arc::new(
3660 WriterProperties::builder()
3661 .set_statistics_enabled(EnabledStatistics::None)
3662 .set_offset_index_disabled(true)
3663 .build(),
3664 );
3665 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3666 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3667
3668 let data = Vec::new();
3669 let def_levels = vec![0; 10];
3670 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3671 writer.flush_data_pages().unwrap();
3672
3673 let column_close_result = writer.close().unwrap();
3674 assert!(column_close_result.offset_index.is_none());
3675 assert!(column_close_result.column_index.is_none());
3676 }
3677
3678 #[test]
3679 fn test_offset_index_overridden() {
3680 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3682 let props = Arc::new(
3683 WriterProperties::builder()
3684 .set_statistics_enabled(EnabledStatistics::Page)
3685 .set_offset_index_disabled(true)
3686 .build(),
3687 );
3688 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3689 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3690
3691 let data = Vec::new();
3692 let def_levels = vec![0; 10];
3693 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3694 writer.flush_data_pages().unwrap();
3695
3696 let column_close_result = writer.close().unwrap();
3697 assert!(column_close_result.offset_index.is_some());
3698 assert!(column_close_result.column_index.is_some());
3699 }
3700
3701 #[test]
3702 fn test_boundary_order() -> Result<()> {
3703 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3704 let column_close_result = write_multiple_pages::<Int32Type>(
3706 &descr,
3707 &[
3708 &[Some(-10), Some(10)],
3709 &[Some(-5), Some(11)],
3710 &[None],
3711 &[Some(-5), Some(11)],
3712 ],
3713 )?;
3714 let boundary_order = column_close_result
3715 .column_index
3716 .unwrap()
3717 .get_boundary_order();
3718 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3719
3720 let column_close_result = write_multiple_pages::<Int32Type>(
3722 &descr,
3723 &[
3724 &[Some(10), Some(11)],
3725 &[Some(5), Some(11)],
3726 &[None],
3727 &[Some(-5), Some(0)],
3728 ],
3729 )?;
3730 let boundary_order = column_close_result
3731 .column_index
3732 .unwrap()
3733 .get_boundary_order();
3734 assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3735
3736 let column_close_result = write_multiple_pages::<Int32Type>(
3738 &descr,
3739 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3740 )?;
3741 let boundary_order = column_close_result
3742 .column_index
3743 .unwrap()
3744 .get_boundary_order();
3745 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3746
3747 let column_close_result =
3749 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3750 let boundary_order = column_close_result
3751 .column_index
3752 .unwrap()
3753 .get_boundary_order();
3754 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3755
3756 let column_close_result =
3758 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3759 let boundary_order = column_close_result
3760 .column_index
3761 .unwrap()
3762 .get_boundary_order();
3763 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3764
3765 let column_close_result =
3767 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3768 let boundary_order = column_close_result
3769 .column_index
3770 .unwrap()
3771 .get_boundary_order();
3772 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3773
3774 let column_close_result = write_multiple_pages::<Int32Type>(
3776 &descr,
3777 &[
3778 &[Some(10), Some(11)],
3779 &[Some(11), Some(16)],
3780 &[None],
3781 &[Some(-5), Some(0)],
3782 ],
3783 )?;
3784 let boundary_order = column_close_result
3785 .column_index
3786 .unwrap()
3787 .get_boundary_order();
3788 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3789
3790 let column_close_result = write_multiple_pages::<Int32Type>(
3792 &descr,
3793 &[
3794 &[Some(1), Some(9)],
3795 &[Some(2), Some(8)],
3796 &[None],
3797 &[Some(3), Some(7)],
3798 ],
3799 )?;
3800 let boundary_order = column_close_result
3801 .column_index
3802 .unwrap()
3803 .get_boundary_order();
3804 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3805
3806 Ok(())
3807 }
3808
3809 #[test]
3810 fn test_boundary_order_logical_type() -> Result<()> {
3811 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3814 let fba_descr = {
3815 let tpe = SchemaType::primitive_type_builder(
3816 "col",
3817 FixedLenByteArrayType::get_physical_type(),
3818 )
3819 .with_length(2)
3820 .build()?;
3821 Arc::new(ColumnDescriptor::new(
3822 Arc::new(tpe),
3823 1,
3824 0,
3825 ColumnPath::from("col"),
3826 ))
3827 };
3828
3829 let values: &[&[Option<FixedLenByteArray>]] = &[
3830 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3831 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3832 &[Some(FixedLenByteArray::from(ByteArray::from(
3833 f16::NEG_ZERO,
3834 )))],
3835 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3836 ];
3837
3838 let column_close_result =
3840 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3841 let boundary_order = column_close_result
3842 .column_index
3843 .unwrap()
3844 .get_boundary_order();
3845 assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3846
3847 let column_close_result =
3849 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3850 let boundary_order = column_close_result
3851 .column_index
3852 .unwrap()
3853 .get_boundary_order();
3854 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3855
3856 Ok(())
3857 }
3858
3859 #[test]
3860 fn test_interval_stats_should_not_have_min_max() {
3861 let input = [
3862 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3863 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3864 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3865 ]
3866 .into_iter()
3867 .map(|s| ByteArray::from(s).into())
3868 .collect::<Vec<_>>();
3869
3870 let page_writer = get_test_page_writer();
3871 let mut writer = get_test_interval_column_writer(page_writer);
3872 writer.write_batch(&input, None, None).unwrap();
3873
3874 let metadata = writer.close().unwrap().metadata;
3875 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3876 stats.clone()
3877 } else {
3878 panic!("metadata missing statistics");
3879 };
3880 assert!(stats.min_bytes_opt().is_none());
3881 assert!(stats.max_bytes_opt().is_none());
3882 }
3883
3884 #[test]
3885 #[cfg(feature = "arrow")]
3886 fn test_column_writer_get_estimated_total_bytes() {
3887 let page_writer = get_test_page_writer();
3888 let props = Default::default();
3889 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3890 assert_eq!(writer.get_estimated_total_bytes(), 0);
3891
3892 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3893 writer.add_data_page().unwrap();
3894 let size_with_one_page = writer.get_estimated_total_bytes();
3895 assert_eq!(size_with_one_page, 20);
3896
3897 writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3898 writer.add_data_page().unwrap();
3899 let size_with_two_pages = writer.get_estimated_total_bytes();
3900 assert_eq!(size_with_two_pages, 20 + 21);
3902 }
3903
3904 fn write_multiple_pages<T: DataType>(
3905 column_descr: &Arc<ColumnDescriptor>,
3906 pages: &[&[Option<T::T>]],
3907 ) -> Result<ColumnCloseResult> {
3908 let column_writer = get_column_writer(
3909 column_descr.clone(),
3910 Default::default(),
3911 get_test_page_writer(),
3912 );
3913 let mut writer = get_typed_column_writer::<T>(column_writer);
3914
3915 for &page in pages {
3916 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3917 let def_levels = page
3918 .iter()
3919 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3920 .collect::<Vec<_>>();
3921 writer.write_batch(&values, Some(&def_levels), None)?;
3922 writer.flush_data_pages()?;
3923 }
3924
3925 writer.close()
3926 }
3927
3928 fn column_roundtrip_random<T: DataType>(
3932 props: WriterProperties,
3933 max_size: usize,
3934 min_value: T::T,
3935 max_value: T::T,
3936 max_def_level: i16,
3937 max_rep_level: i16,
3938 ) where
3939 T::T: PartialOrd + SampleUniform + Copy,
3940 {
3941 let mut num_values: usize = 0;
3942
3943 let mut buf: Vec<i16> = Vec::new();
3944 let def_levels = if max_def_level > 0 {
3945 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3946 for &dl in &buf[..] {
3947 if dl == max_def_level {
3948 num_values += 1;
3949 }
3950 }
3951 Some(&buf[..])
3952 } else {
3953 num_values = max_size;
3954 None
3955 };
3956
3957 let mut buf: Vec<i16> = Vec::new();
3958 let rep_levels = if max_rep_level > 0 {
3959 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3960 buf[0] = 0; Some(&buf[..])
3962 } else {
3963 None
3964 };
3965
3966 let mut values: Vec<T::T> = Vec::new();
3967 random_numbers_range(num_values, min_value, max_value, &mut values);
3968
3969 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3970 }
3971
3972 fn column_roundtrip<T: DataType>(
3974 props: WriterProperties,
3975 values: &[T::T],
3976 def_levels: Option<&[i16]>,
3977 rep_levels: Option<&[i16]>,
3978 ) {
3979 let mut file = tempfile::tempfile().unwrap();
3980 let mut write = TrackedWrite::new(&mut file);
3981 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3982
3983 let max_def_level = match def_levels {
3984 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3985 None => 0i16,
3986 };
3987
3988 let max_rep_level = match rep_levels {
3989 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3990 None => 0i16,
3991 };
3992
3993 let mut max_batch_size = values.len();
3994 if let Some(levels) = def_levels {
3995 max_batch_size = max_batch_size.max(levels.len());
3996 }
3997 if let Some(levels) = rep_levels {
3998 max_batch_size = max_batch_size.max(levels.len());
3999 }
4000
4001 let mut writer =
4002 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4003
4004 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
4005 assert_eq!(values_written, values.len());
4006 let result = writer.close().unwrap();
4007
4008 drop(write);
4009
4010 let props = ReaderProperties::builder()
4011 .set_backward_compatible_lz4(false)
4012 .build();
4013 let page_reader = Box::new(
4014 SerializedPageReader::new_with_properties(
4015 Arc::new(file),
4016 &result.metadata,
4017 result.rows_written as usize,
4018 None,
4019 Arc::new(props),
4020 )
4021 .unwrap(),
4022 );
4023 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
4024
4025 let mut actual_values = Vec::with_capacity(max_batch_size);
4026 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
4027 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
4028
4029 let (_, values_read, levels_read) = reader
4030 .read_records(
4031 max_batch_size,
4032 actual_def_levels.as_mut(),
4033 actual_rep_levels.as_mut(),
4034 &mut actual_values,
4035 )
4036 .unwrap();
4037
4038 assert_eq!(&actual_values[..values_read], values);
4041 match actual_def_levels {
4042 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
4043 None => assert_eq!(None, def_levels),
4044 }
4045 match actual_rep_levels {
4046 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
4047 None => assert_eq!(None, rep_levels),
4048 }
4049
4050 if let Some(levels) = actual_rep_levels {
4053 let mut actual_rows_written = 0;
4054 for l in levels {
4055 if l == 0 {
4056 actual_rows_written += 1;
4057 }
4058 }
4059 assert_eq!(actual_rows_written, result.rows_written);
4060 } else if actual_def_levels.is_some() {
4061 assert_eq!(levels_read as u64, result.rows_written);
4062 } else {
4063 assert_eq!(values_read as u64, result.rows_written);
4064 }
4065 }
4066
4067 fn column_write_and_get_metadata<T: DataType>(
4070 props: WriterProperties,
4071 values: &[T::T],
4072 ) -> ColumnChunkMetaData {
4073 let page_writer = get_test_page_writer();
4074 let props = Arc::new(props);
4075 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4076 writer.write_batch(values, None, None).unwrap();
4077 writer.close().unwrap().metadata
4078 }
4079
4080 fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4082 PageEncodingStats {
4083 page_type,
4084 encoding,
4085 count,
4086 }
4087 }
4088
4089 fn check_encoding_write_support<T: DataType>(
4093 version: WriterVersion,
4094 dict_enabled: bool,
4095 data: &[T::T],
4096 dictionary_page_offset: Option<i64>,
4097 encodings: &[Encoding],
4098 page_encoding_stats: &[PageEncodingStats],
4099 ) {
4100 let props = WriterProperties::builder()
4101 .set_writer_version(version)
4102 .set_dictionary_enabled(dict_enabled)
4103 .build();
4104 let meta = column_write_and_get_metadata::<T>(props, data);
4105 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4106 assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
4107 assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4108 }
4109
4110 fn get_test_column_writer<'a, T: DataType>(
4112 page_writer: Box<dyn PageWriter + 'a>,
4113 max_def_level: i16,
4114 max_rep_level: i16,
4115 props: WriterPropertiesPtr,
4116 ) -> ColumnWriterImpl<'a, T> {
4117 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4118 let column_writer = get_column_writer(descr, props, page_writer);
4119 get_typed_column_writer::<T>(column_writer)
4120 }
4121
4122 fn get_test_column_reader<T: DataType>(
4124 page_reader: Box<dyn PageReader>,
4125 max_def_level: i16,
4126 max_rep_level: i16,
4127 ) -> ColumnReaderImpl<T> {
4128 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4129 let column_reader = get_column_reader(descr, page_reader);
4130 get_typed_column_reader::<T>(column_reader)
4131 }
4132
4133 fn get_test_column_descr<T: DataType>(
4135 max_def_level: i16,
4136 max_rep_level: i16,
4137 ) -> ColumnDescriptor {
4138 let path = ColumnPath::from("col");
4139 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4140 .with_length(1)
4143 .build()
4144 .unwrap();
4145 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4146 }
4147
4148 fn get_test_page_writer() -> Box<dyn PageWriter> {
4150 Box::new(TestPageWriter {})
4151 }
4152
4153 struct TestPageWriter {}
4154
4155 impl PageWriter for TestPageWriter {
4156 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4157 let mut res = PageWriteSpec::new();
4158 res.page_type = page.page_type();
4159 res.uncompressed_size = page.uncompressed_size();
4160 res.compressed_size = page.compressed_size();
4161 res.num_values = page.num_values();
4162 res.offset = 0;
4163 res.bytes_written = page.data().len() as u64;
4164 Ok(res)
4165 }
4166
4167 fn close(&mut self) -> Result<()> {
4168 Ok(())
4169 }
4170 }
4171
4172 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4174 let page_writer = get_test_page_writer();
4175 let props = Default::default();
4176 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4177 writer.write_batch(values, None, None).unwrap();
4178
4179 let metadata = writer.close().unwrap().metadata;
4180 if let Some(stats) = metadata.statistics() {
4181 stats.clone()
4182 } else {
4183 panic!("metadata missing statistics");
4184 }
4185 }
4186
4187 fn get_test_decimals_column_writer<T: DataType>(
4189 page_writer: Box<dyn PageWriter>,
4190 max_def_level: i16,
4191 max_rep_level: i16,
4192 props: WriterPropertiesPtr,
4193 ) -> ColumnWriterImpl<'static, T> {
4194 let descr = Arc::new(get_test_decimals_column_descr::<T>(
4195 max_def_level,
4196 max_rep_level,
4197 ));
4198 let column_writer = get_column_writer(descr, props, page_writer);
4199 get_typed_column_writer::<T>(column_writer)
4200 }
4201
4202 fn get_test_decimals_column_descr<T: DataType>(
4204 max_def_level: i16,
4205 max_rep_level: i16,
4206 ) -> ColumnDescriptor {
4207 let path = ColumnPath::from("col");
4208 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4209 .with_length(16)
4210 .with_logical_type(Some(LogicalType::Decimal {
4211 scale: 2,
4212 precision: 3,
4213 }))
4214 .with_scale(2)
4215 .with_precision(3)
4216 .build()
4217 .unwrap();
4218 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4219 }
4220
4221 fn float16_statistics_roundtrip(
4222 values: &[FixedLenByteArray],
4223 ) -> ValueStatistics<FixedLenByteArray> {
4224 let page_writer = get_test_page_writer();
4225 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4226 writer.write_batch(values, None, None).unwrap();
4227
4228 let metadata = writer.close().unwrap().metadata;
4229 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4230 stats.clone()
4231 } else {
4232 panic!("metadata missing statistics");
4233 }
4234 }
4235
4236 fn get_test_float16_column_writer(
4237 page_writer: Box<dyn PageWriter>,
4238 props: WriterPropertiesPtr,
4239 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4240 let descr = Arc::new(get_test_float16_column_descr(0, 0));
4241 let column_writer = get_column_writer(descr, props, page_writer);
4242 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4243 }
4244
4245 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4246 let path = ColumnPath::from("col");
4247 let tpe =
4248 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4249 .with_length(2)
4250 .with_logical_type(Some(LogicalType::Float16))
4251 .build()
4252 .unwrap();
4253 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4254 }
4255
4256 fn get_test_interval_column_writer(
4257 page_writer: Box<dyn PageWriter>,
4258 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4259 let descr = Arc::new(get_test_interval_column_descr());
4260 let column_writer = get_column_writer(descr, Default::default(), page_writer);
4261 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4262 }
4263
4264 fn get_test_interval_column_descr() -> ColumnDescriptor {
4265 let path = ColumnPath::from("col");
4266 let tpe =
4267 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4268 .with_length(12)
4269 .with_converted_type(ConvertedType::INTERVAL)
4270 .build()
4271 .unwrap();
4272 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4273 }
4274
4275 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4277 page_writer: Box<dyn PageWriter + 'a>,
4278 max_def_level: i16,
4279 max_rep_level: i16,
4280 props: WriterPropertiesPtr,
4281 ) -> ColumnWriterImpl<'a, T> {
4282 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4283 max_def_level,
4284 max_rep_level,
4285 ));
4286 let column_writer = get_column_writer(descr, props, page_writer);
4287 get_typed_column_writer::<T>(column_writer)
4288 }
4289
4290 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4292 max_def_level: i16,
4293 max_rep_level: i16,
4294 ) -> ColumnDescriptor {
4295 let path = ColumnPath::from("col");
4296 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4297 .with_converted_type(ConvertedType::UINT_32)
4298 .build()
4299 .unwrap();
4300 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4301 }
4302
4303 #[test]
4304 fn test_page_v2_snappy_compression_fallback() {
4305 let page_writer = TestPageWriter {};
4307
4308 let props = WriterProperties::builder()
4310 .set_writer_version(WriterVersion::PARQUET_2_0)
4311 .set_dictionary_enabled(false)
4313 .set_compression(Compression::SNAPPY)
4314 .build();
4315
4316 let mut column_writer =
4317 get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
4318
4319 let values = vec![ByteArray::from("a")];
4322
4323 column_writer.write_batch(&values, None, None).unwrap();
4324
4325 let result = column_writer.close().unwrap();
4326 assert_eq!(
4327 result.metadata.uncompressed_size(),
4328 result.metadata.compressed_size()
4329 );
4330 }
4331}