1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::file::page_index::column_index::ColumnIndexMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use std::collections::{BTreeSet, VecDeque};
27use std::str;
28
29use crate::basic::{
30 BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType, Type,
31};
32use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
33use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
34use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
35use crate::data_type::private::ParquetValueType;
36use crate::data_type::*;
37use crate::encodings::levels::LevelEncoder;
38#[cfg(feature = "encryption")]
39use crate::encryption::encrypt::get_column_crypto_metadata;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{
42 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
43 OffsetIndexBuilder, PageEncodingStats,
44};
45use crate::file::properties::{
46 EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
47};
48use crate::file::statistics::{Statistics, ValueStatistics};
49use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
50
51pub(crate) mod encoder;
52
53macro_rules! downcast_writer {
54 ($e:expr, $i:ident, $b:expr) => {
55 match $e {
56 Self::BoolColumnWriter($i) => $b,
57 Self::Int32ColumnWriter($i) => $b,
58 Self::Int64ColumnWriter($i) => $b,
59 Self::Int96ColumnWriter($i) => $b,
60 Self::FloatColumnWriter($i) => $b,
61 Self::DoubleColumnWriter($i) => $b,
62 Self::ByteArrayColumnWriter($i) => $b,
63 Self::FixedLenByteArrayColumnWriter($i) => $b,
64 }
65 };
66}
67
68pub enum ColumnWriter<'a> {
72 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
74 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
76 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
78 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
80 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
82 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
84 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
86 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
88}
89
90impl ColumnWriter<'_> {
91 #[cfg(feature = "arrow")]
93 pub(crate) fn memory_size(&self) -> usize {
94 downcast_writer!(self, typed, typed.memory_size())
95 }
96
97 #[cfg(feature = "arrow")]
99 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
100 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
101 }
102
103 pub fn close(self) -> Result<ColumnCloseResult> {
105 downcast_writer!(self, typed, typed.close())
106 }
107}
108
109pub fn get_column_writer<'a>(
111 descr: ColumnDescPtr,
112 props: WriterPropertiesPtr,
113 page_writer: Box<dyn PageWriter + 'a>,
114) -> ColumnWriter<'a> {
115 match descr.physical_type() {
116 Type::BOOLEAN => {
117 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
118 }
119 Type::INT32 => {
120 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
121 }
122 Type::INT64 => {
123 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
124 }
125 Type::INT96 => {
126 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127 }
128 Type::FLOAT => {
129 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130 }
131 Type::DOUBLE => {
132 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133 }
134 Type::BYTE_ARRAY => {
135 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136 }
137 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
138 ColumnWriterImpl::new(descr, props, page_writer),
139 ),
140 }
141}
142
143pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
148 T::get_column_writer(col_writer).unwrap_or_else(|| {
149 panic!(
150 "Failed to convert column writer into a typed column writer for `{}` type",
151 T::get_physical_type()
152 )
153 })
154}
155
156pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
158 col_writer: &'b ColumnWriter<'a>,
159) -> &'b ColumnWriterImpl<'a, T> {
160 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
161 panic!(
162 "Failed to convert column writer into a typed column writer for `{}` type",
163 T::get_physical_type()
164 )
165 })
166}
167
168pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
170 col_writer: &'a mut ColumnWriter<'b>,
171) -> &'a mut ColumnWriterImpl<'b, T> {
172 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
173 panic!(
174 "Failed to convert column writer into a typed column writer for `{}` type",
175 T::get_physical_type()
176 )
177 })
178}
179
180#[derive(Debug, Clone)]
184pub struct ColumnCloseResult {
185 pub bytes_written: u64,
187 pub rows_written: u64,
189 pub metadata: ColumnChunkMetaData,
191 pub bloom_filter: Option<Sbbf>,
193 pub column_index: Option<ColumnIndexMetaData>,
195 pub offset_index: Option<OffsetIndexMetaData>,
197}
198
199#[derive(Default)]
201struct PageMetrics {
202 num_buffered_values: u32,
203 num_buffered_rows: u32,
204 num_page_nulls: u64,
205 repetition_level_histogram: Option<LevelHistogram>,
206 definition_level_histogram: Option<LevelHistogram>,
207}
208
209impl PageMetrics {
210 fn new() -> Self {
211 Default::default()
212 }
213
214 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
216 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
217 self
218 }
219
220 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
222 self.definition_level_histogram = LevelHistogram::try_new(max_level);
223 self
224 }
225
226 fn new_page(&mut self) {
229 self.num_buffered_values = 0;
230 self.num_buffered_rows = 0;
231 self.num_page_nulls = 0;
232 self.repetition_level_histogram
233 .as_mut()
234 .map(LevelHistogram::reset);
235 self.definition_level_histogram
236 .as_mut()
237 .map(LevelHistogram::reset);
238 }
239
240 fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
242 if let Some(ref mut rep_hist) = self.repetition_level_histogram {
243 rep_hist.update_from_levels(levels);
244 }
245 }
246
247 fn update_definition_level_histogram(&mut self, levels: &[i16]) {
249 if let Some(ref mut def_hist) = self.definition_level_histogram {
250 def_hist.update_from_levels(levels);
251 }
252 }
253}
254
255#[derive(Default)]
257struct ColumnMetrics<T: Default> {
258 total_bytes_written: u64,
259 total_rows_written: u64,
260 total_uncompressed_size: u64,
261 total_compressed_size: u64,
262 total_num_values: u64,
263 dictionary_page_offset: Option<u64>,
264 data_page_offset: Option<u64>,
265 min_column_value: Option<T>,
266 max_column_value: Option<T>,
267 num_column_nulls: u64,
268 column_distinct_count: Option<u64>,
269 variable_length_bytes: Option<i64>,
270 repetition_level_histogram: Option<LevelHistogram>,
271 definition_level_histogram: Option<LevelHistogram>,
272}
273
274impl<T: Default> ColumnMetrics<T> {
275 fn new() -> Self {
276 Default::default()
277 }
278
279 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
281 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
282 self
283 }
284
285 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
287 self.definition_level_histogram = LevelHistogram::try_new(max_level);
288 self
289 }
290
291 fn update_histogram(
293 chunk_histogram: &mut Option<LevelHistogram>,
294 page_histogram: &Option<LevelHistogram>,
295 ) {
296 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
297 chunk_hist.add(page_hist);
298 }
299 }
300
301 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
304 ColumnMetrics::<T>::update_histogram(
305 &mut self.definition_level_histogram,
306 &page_metrics.definition_level_histogram,
307 );
308 ColumnMetrics::<T>::update_histogram(
309 &mut self.repetition_level_histogram,
310 &page_metrics.repetition_level_histogram,
311 );
312 }
313
314 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
316 if let Some(var_bytes) = variable_length_bytes {
317 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
318 }
319 }
320}
321
322pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
324
325pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
327 descr: ColumnDescPtr,
329 props: WriterPropertiesPtr,
330 statistics_enabled: EnabledStatistics,
331
332 page_writer: Box<dyn PageWriter + 'a>,
333 codec: Compression,
334 compressor: Option<Box<dyn Codec>>,
335 encoder: E,
336
337 page_metrics: PageMetrics,
338 column_metrics: ColumnMetrics<E::T>,
340
341 encodings: BTreeSet<Encoding>,
344 encoding_stats: Vec<PageEncodingStats>,
345 def_levels_sink: Vec<i16>,
347 rep_levels_sink: Vec<i16>,
348 data_pages: VecDeque<CompressedPage>,
349 column_index_builder: ColumnIndexBuilder,
351 offset_index_builder: Option<OffsetIndexBuilder>,
352
353 data_page_boundary_ascending: bool,
356 data_page_boundary_descending: bool,
357 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
359}
360
361impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
362 pub fn new(
364 descr: ColumnDescPtr,
365 props: WriterPropertiesPtr,
366 page_writer: Box<dyn PageWriter + 'a>,
367 ) -> Self {
368 let codec = props.compression(descr.path());
369 let codec_options = CodecOptionsBuilder::default().build();
370 let compressor = create_codec(codec, &codec_options).unwrap();
371 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
372
373 let statistics_enabled = props.statistics_enabled(descr.path());
374
375 let mut encodings = BTreeSet::new();
376 encodings.insert(Encoding::RLE);
378
379 let mut page_metrics = PageMetrics::new();
380 let mut column_metrics = ColumnMetrics::<E::T>::new();
381
382 if statistics_enabled != EnabledStatistics::None {
384 page_metrics = page_metrics
385 .with_repetition_level_histogram(descr.max_rep_level())
386 .with_definition_level_histogram(descr.max_def_level());
387 column_metrics = column_metrics
388 .with_repetition_level_histogram(descr.max_rep_level())
389 .with_definition_level_histogram(descr.max_def_level())
390 }
391
392 let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type());
394 if statistics_enabled != EnabledStatistics::Page {
395 column_index_builder.to_invalid()
396 }
397
398 let offset_index_builder = match props.offset_index_disabled() {
400 false => Some(OffsetIndexBuilder::new()),
401 _ => None,
402 };
403
404 Self {
405 descr,
406 props,
407 statistics_enabled,
408 page_writer,
409 codec,
410 compressor,
411 encoder,
412 def_levels_sink: vec![],
413 rep_levels_sink: vec![],
414 data_pages: VecDeque::new(),
415 page_metrics,
416 column_metrics,
417 column_index_builder,
418 offset_index_builder,
419 encodings,
420 encoding_stats: vec![],
421 data_page_boundary_ascending: true,
422 data_page_boundary_descending: true,
423 last_non_null_data_page_min_max: None,
424 }
425 }
426
427 #[allow(clippy::too_many_arguments)]
428 pub(crate) fn write_batch_internal(
429 &mut self,
430 values: &E::Values,
431 value_indices: Option<&[usize]>,
432 def_levels: Option<&[i16]>,
433 rep_levels: Option<&[i16]>,
434 min: Option<&E::T>,
435 max: Option<&E::T>,
436 distinct_count: Option<u64>,
437 ) -> Result<usize> {
438 if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
440 if def.len() != rep.len() {
441 return Err(general_err!(
442 "Inconsistent length of definition and repetition levels: {} != {}",
443 def.len(),
444 rep.len()
445 ));
446 }
447 }
448
449 let num_levels = match def_levels {
460 Some(def_levels) => def_levels.len(),
461 None => values.len(),
462 };
463
464 if let Some(min) = min {
465 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
466 }
467 if let Some(max) = max {
468 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
469 }
470
471 if self.encoder.num_values() == 0 {
473 self.column_metrics.column_distinct_count = distinct_count;
474 } else {
475 self.column_metrics.column_distinct_count = None;
476 }
477
478 let mut values_offset = 0;
479 let mut levels_offset = 0;
480 let base_batch_size = self.props.write_batch_size();
481 while levels_offset < num_levels {
482 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
483
484 if let Some(r) = rep_levels {
486 while end_offset < r.len() && r[end_offset] != 0 {
487 end_offset += 1;
488 }
489 }
490
491 values_offset += self.write_mini_batch(
492 values,
493 values_offset,
494 value_indices,
495 end_offset - levels_offset,
496 def_levels.map(|lv| &lv[levels_offset..end_offset]),
497 rep_levels.map(|lv| &lv[levels_offset..end_offset]),
498 )?;
499 levels_offset = end_offset;
500 }
501
502 Ok(values_offset)
504 }
505
506 pub fn write_batch(
519 &mut self,
520 values: &E::Values,
521 def_levels: Option<&[i16]>,
522 rep_levels: Option<&[i16]>,
523 ) -> Result<usize> {
524 self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
525 }
526
527 pub fn write_batch_with_statistics(
535 &mut self,
536 values: &E::Values,
537 def_levels: Option<&[i16]>,
538 rep_levels: Option<&[i16]>,
539 min: Option<&E::T>,
540 max: Option<&E::T>,
541 distinct_count: Option<u64>,
542 ) -> Result<usize> {
543 self.write_batch_internal(
544 values,
545 None,
546 def_levels,
547 rep_levels,
548 min,
549 max,
550 distinct_count,
551 )
552 }
553
554 #[cfg(feature = "arrow")]
559 pub(crate) fn memory_size(&self) -> usize {
560 self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
561 }
562
563 pub fn get_total_bytes_written(&self) -> u64 {
569 self.column_metrics.total_bytes_written
570 }
571
572 #[cfg(feature = "arrow")]
578 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
579 self.data_pages
580 .iter()
581 .map(|page| page.data().len() as u64)
582 .sum::<u64>()
583 + self.column_metrics.total_bytes_written
584 + self.encoder.estimated_data_page_size() as u64
585 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
586 }
587
588 pub fn get_total_rows_written(&self) -> u64 {
591 self.column_metrics.total_rows_written
592 }
593
594 pub fn get_descriptor(&self) -> &ColumnDescPtr {
596 &self.descr
597 }
598
599 pub fn close(mut self) -> Result<ColumnCloseResult> {
602 if self.page_metrics.num_buffered_values > 0 {
603 self.add_data_page()?;
604 }
605 if self.encoder.has_dictionary() {
606 self.write_dictionary_page()?;
607 }
608 self.flush_data_pages()?;
609 let metadata = self.build_column_metadata()?;
610 self.page_writer.close()?;
611
612 let boundary_order = match (
613 self.data_page_boundary_ascending,
614 self.data_page_boundary_descending,
615 ) {
616 (true, _) => BoundaryOrder::ASCENDING,
619 (false, true) => BoundaryOrder::DESCENDING,
620 (false, false) => BoundaryOrder::UNORDERED,
621 };
622 self.column_index_builder.set_boundary_order(boundary_order);
623
624 let column_index = match self.column_index_builder.valid() {
625 true => Some(self.column_index_builder.build()?),
626 false => None,
627 };
628
629 let offset_index = self.offset_index_builder.map(|b| b.build());
630
631 Ok(ColumnCloseResult {
632 bytes_written: self.column_metrics.total_bytes_written,
633 rows_written: self.column_metrics.total_rows_written,
634 bloom_filter: self.encoder.flush_bloom_filter(),
635 metadata,
636 column_index,
637 offset_index,
638 })
639 }
640
641 fn write_mini_batch(
645 &mut self,
646 values: &E::Values,
647 values_offset: usize,
648 value_indices: Option<&[usize]>,
649 num_levels: usize,
650 def_levels: Option<&[i16]>,
651 rep_levels: Option<&[i16]>,
652 ) -> Result<usize> {
653 let values_to_write = if self.descr.max_def_level() > 0 {
655 let levels = def_levels.ok_or_else(|| {
656 general_err!(
657 "Definition levels are required, because max definition level = {}",
658 self.descr.max_def_level()
659 )
660 })?;
661
662 let values_to_write = levels
663 .iter()
664 .map(|level| (*level == self.descr.max_def_level()) as usize)
665 .sum();
666 self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
667
668 self.page_metrics.update_definition_level_histogram(levels);
670
671 self.def_levels_sink.extend_from_slice(levels);
672 values_to_write
673 } else {
674 num_levels
675 };
676
677 if self.descr.max_rep_level() > 0 {
679 let levels = rep_levels.ok_or_else(|| {
681 general_err!(
682 "Repetition levels are required, because max repetition level = {}",
683 self.descr.max_rep_level()
684 )
685 })?;
686
687 if !levels.is_empty() && levels[0] != 0 {
688 return Err(general_err!(
689 "Write must start at a record boundary, got non-zero repetition level of {}",
690 levels[0]
691 ));
692 }
693
694 for &level in levels {
696 self.page_metrics.num_buffered_rows += (level == 0) as u32
697 }
698
699 self.page_metrics.update_repetition_level_histogram(levels);
701
702 self.rep_levels_sink.extend_from_slice(levels);
703 } else {
704 self.page_metrics.num_buffered_rows += num_levels as u32;
707 }
708
709 match value_indices {
710 Some(indices) => {
711 let indices = &indices[values_offset..values_offset + values_to_write];
712 self.encoder.write_gather(values, indices)?;
713 }
714 None => self.encoder.write(values, values_offset, values_to_write)?,
715 }
716
717 self.page_metrics.num_buffered_values += num_levels as u32;
718
719 if self.should_add_data_page() {
720 self.add_data_page()?;
721 }
722
723 if self.should_dict_fallback() {
724 self.dict_fallback()?;
725 }
726
727 Ok(values_to_write)
728 }
729
730 #[inline]
735 fn should_dict_fallback(&self) -> bool {
736 match self.encoder.estimated_dict_page_size() {
737 Some(size) => {
738 size >= self
739 .props
740 .column_dictionary_page_size_limit(self.descr.path())
741 }
742 None => false,
743 }
744 }
745
746 #[inline]
748 fn should_add_data_page(&self) -> bool {
749 if self.page_metrics.num_buffered_values == 0 {
754 return false;
755 }
756
757 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
758 || self.encoder.estimated_data_page_size()
759 >= self.props.column_data_page_size_limit(self.descr.path())
760 }
761
762 fn dict_fallback(&mut self) -> Result<()> {
765 if self.page_metrics.num_buffered_values > 0 {
767 self.add_data_page()?;
768 }
769 self.write_dictionary_page()?;
770 self.flush_data_pages()?;
771 Ok(())
772 }
773
774 fn update_column_offset_index(
776 &mut self,
777 page_statistics: Option<&ValueStatistics<E::T>>,
778 page_variable_length_bytes: Option<i64>,
779 ) {
780 let null_page =
782 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
783 if null_page && self.column_index_builder.valid() {
786 self.column_index_builder.append(
787 null_page,
788 vec![],
789 vec![],
790 self.page_metrics.num_page_nulls as i64,
791 );
792 } else if self.column_index_builder.valid() {
793 match &page_statistics {
796 None => {
797 self.column_index_builder.to_invalid();
798 }
799 Some(stat) => {
800 let new_min = stat.min_opt().unwrap();
802 let new_max = stat.max_opt().unwrap();
803 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
804 if self.data_page_boundary_ascending {
805 let not_ascending = compare_greater(&self.descr, last_min, new_min)
807 || compare_greater(&self.descr, last_max, new_max);
808 if not_ascending {
809 self.data_page_boundary_ascending = false;
810 }
811 }
812
813 if self.data_page_boundary_descending {
814 let not_descending = compare_greater(&self.descr, new_min, last_min)
816 || compare_greater(&self.descr, new_max, last_max);
817 if not_descending {
818 self.data_page_boundary_descending = false;
819 }
820 }
821 }
822 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
823
824 if self.can_truncate_value() {
825 self.column_index_builder.append(
826 null_page,
827 self.truncate_min_value(
828 self.props.column_index_truncate_length(),
829 stat.min_bytes_opt().unwrap(),
830 )
831 .0,
832 self.truncate_max_value(
833 self.props.column_index_truncate_length(),
834 stat.max_bytes_opt().unwrap(),
835 )
836 .0,
837 self.page_metrics.num_page_nulls as i64,
838 );
839 } else {
840 self.column_index_builder.append(
841 null_page,
842 stat.min_bytes_opt().unwrap().to_vec(),
843 stat.max_bytes_opt().unwrap().to_vec(),
844 self.page_metrics.num_page_nulls as i64,
845 );
846 }
847 }
848 }
849 }
850
851 self.column_index_builder.append_histograms(
853 &self.page_metrics.repetition_level_histogram,
854 &self.page_metrics.definition_level_histogram,
855 );
856
857 if let Some(builder) = self.offset_index_builder.as_mut() {
859 builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
860 builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
861 }
862 }
863
864 fn can_truncate_value(&self) -> bool {
866 match self.descr.physical_type() {
867 Type::FIXED_LEN_BYTE_ARRAY
871 if !matches!(
872 self.descr.logical_type_ref(),
873 Some(&LogicalType::Decimal { .. }) | Some(&LogicalType::Float16)
874 ) =>
875 {
876 true
877 }
878 Type::BYTE_ARRAY => true,
879 _ => false,
881 }
882 }
883
884 fn is_utf8(&self) -> bool {
886 self.get_descriptor().logical_type_ref() == Some(&LogicalType::String)
887 || self.get_descriptor().converted_type() == ConvertedType::UTF8
888 }
889
890 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
901 truncation_length
902 .filter(|l| data.len() > *l)
903 .and_then(|l|
904 if self.is_utf8() {
906 match str::from_utf8(data) {
907 Ok(str_data) => truncate_utf8(str_data, l),
908 Err(_) => Some(data[..l].to_vec()),
909 }
910 } else {
911 Some(data[..l].to_vec())
912 }
913 )
914 .map(|truncated| (truncated, true))
915 .unwrap_or_else(|| (data.to_vec(), false))
916 }
917
918 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
932 truncation_length
933 .filter(|l| data.len() > *l)
934 .and_then(|l|
935 if self.is_utf8() {
937 match str::from_utf8(data) {
938 Ok(str_data) => truncate_and_increment_utf8(str_data, l),
939 Err(_) => increment(data[..l].to_vec()),
940 }
941 } else {
942 increment(data[..l].to_vec())
943 }
944 )
945 .map(|truncated| (truncated, true))
946 .unwrap_or_else(|| (data.to_vec(), false))
947 }
948
949 fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
952 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
953 match statistics {
954 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
955 let (min, did_truncate_min) = self.truncate_min_value(
956 self.props.statistics_truncate_length(),
957 stats.min_bytes_opt().unwrap(),
958 );
959 let (max, did_truncate_max) = self.truncate_max_value(
960 self.props.statistics_truncate_length(),
961 stats.max_bytes_opt().unwrap(),
962 );
963 Statistics::ByteArray(
964 ValueStatistics::new(
965 Some(min.into()),
966 Some(max.into()),
967 stats.distinct_count(),
968 stats.null_count_opt(),
969 backwards_compatible_min_max,
970 )
971 .with_max_is_exact(!did_truncate_max)
972 .with_min_is_exact(!did_truncate_min),
973 )
974 }
975 Statistics::FixedLenByteArray(stats)
976 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
977 {
978 let (min, did_truncate_min) = self.truncate_min_value(
979 self.props.statistics_truncate_length(),
980 stats.min_bytes_opt().unwrap(),
981 );
982 let (max, did_truncate_max) = self.truncate_max_value(
983 self.props.statistics_truncate_length(),
984 stats.max_bytes_opt().unwrap(),
985 );
986 Statistics::FixedLenByteArray(
987 ValueStatistics::new(
988 Some(min.into()),
989 Some(max.into()),
990 stats.distinct_count(),
991 stats.null_count_opt(),
992 backwards_compatible_min_max,
993 )
994 .with_max_is_exact(!did_truncate_max)
995 .with_min_is_exact(!did_truncate_min),
996 )
997 }
998 stats => stats,
999 }
1000 }
1001
1002 fn add_data_page(&mut self) -> Result<()> {
1005 let values_data = self.encoder.flush_data_page()?;
1007
1008 let max_def_level = self.descr.max_def_level();
1009 let max_rep_level = self.descr.max_rep_level();
1010
1011 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1012
1013 let page_statistics = match (values_data.min_value, values_data.max_value) {
1014 (Some(min), Some(max)) => {
1015 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1017 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1018
1019 (self.statistics_enabled == EnabledStatistics::Page).then_some(
1020 ValueStatistics::new(
1021 Some(min),
1022 Some(max),
1023 None,
1024 Some(self.page_metrics.num_page_nulls),
1025 false,
1026 ),
1027 )
1028 }
1029 _ => None,
1030 };
1031
1032 self.update_column_offset_index(
1034 page_statistics.as_ref(),
1035 values_data.variable_length_bytes,
1036 );
1037
1038 self.column_metrics
1040 .update_from_page_metrics(&self.page_metrics);
1041 self.column_metrics
1042 .update_variable_length_bytes(values_data.variable_length_bytes);
1043
1044 let page_statistics = page_statistics
1046 .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1047 .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1048
1049 let compressed_page = match self.props.writer_version() {
1050 WriterVersion::PARQUET_1_0 => {
1051 let mut buffer = vec![];
1052
1053 if max_rep_level > 0 {
1054 buffer.extend_from_slice(
1055 &self.encode_levels_v1(
1056 Encoding::RLE,
1057 &self.rep_levels_sink[..],
1058 max_rep_level,
1059 )[..],
1060 );
1061 }
1062
1063 if max_def_level > 0 {
1064 buffer.extend_from_slice(
1065 &self.encode_levels_v1(
1066 Encoding::RLE,
1067 &self.def_levels_sink[..],
1068 max_def_level,
1069 )[..],
1070 );
1071 }
1072
1073 buffer.extend_from_slice(&values_data.buf);
1074 let uncompressed_size = buffer.len();
1075
1076 if let Some(ref mut cmpr) = self.compressor {
1077 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1078 cmpr.compress(&buffer[..], &mut compressed_buf)?;
1079 compressed_buf.shrink_to_fit();
1080 buffer = compressed_buf;
1081 }
1082
1083 let data_page = Page::DataPage {
1084 buf: buffer.into(),
1085 num_values: self.page_metrics.num_buffered_values,
1086 encoding: values_data.encoding,
1087 def_level_encoding: Encoding::RLE,
1088 rep_level_encoding: Encoding::RLE,
1089 statistics: page_statistics,
1090 };
1091
1092 CompressedPage::new(data_page, uncompressed_size)
1093 }
1094 WriterVersion::PARQUET_2_0 => {
1095 let mut rep_levels_byte_len = 0;
1096 let mut def_levels_byte_len = 0;
1097 let mut buffer = vec![];
1098
1099 if max_rep_level > 0 {
1100 let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1101 rep_levels_byte_len = levels.len();
1102 buffer.extend_from_slice(&levels[..]);
1103 }
1104
1105 if max_def_level > 0 {
1106 let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1107 def_levels_byte_len = levels.len();
1108 buffer.extend_from_slice(&levels[..]);
1109 }
1110
1111 let uncompressed_size =
1112 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1113
1114 let is_compressed = match self.compressor {
1116 Some(ref mut cmpr) => {
1117 let buffer_len = buffer.len();
1118 cmpr.compress(&values_data.buf, &mut buffer)?;
1119 if uncompressed_size <= buffer.len() - buffer_len {
1120 buffer.truncate(buffer_len);
1121 buffer.extend_from_slice(&values_data.buf);
1122 false
1123 } else {
1124 true
1125 }
1126 }
1127 None => {
1128 buffer.extend_from_slice(&values_data.buf);
1129 false
1130 }
1131 };
1132
1133 let data_page = Page::DataPageV2 {
1134 buf: buffer.into(),
1135 num_values: self.page_metrics.num_buffered_values,
1136 encoding: values_data.encoding,
1137 num_nulls: self.page_metrics.num_page_nulls as u32,
1138 num_rows: self.page_metrics.num_buffered_rows,
1139 def_levels_byte_len: def_levels_byte_len as u32,
1140 rep_levels_byte_len: rep_levels_byte_len as u32,
1141 is_compressed,
1142 statistics: page_statistics,
1143 };
1144
1145 CompressedPage::new(data_page, uncompressed_size)
1146 }
1147 };
1148
1149 if self.encoder.has_dictionary() {
1151 self.data_pages.push_back(compressed_page);
1152 } else {
1153 self.write_data_page(compressed_page)?;
1154 }
1155
1156 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1158
1159 self.rep_levels_sink.clear();
1161 self.def_levels_sink.clear();
1162 self.page_metrics.new_page();
1163
1164 Ok(())
1165 }
1166
1167 #[inline]
1170 fn flush_data_pages(&mut self) -> Result<()> {
1171 if self.page_metrics.num_buffered_values > 0 {
1173 self.add_data_page()?;
1174 }
1175
1176 while let Some(page) = self.data_pages.pop_front() {
1177 self.write_data_page(page)?;
1178 }
1179
1180 Ok(())
1181 }
1182
1183 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1185 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1186 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1187 let num_values = self.column_metrics.total_num_values as i64;
1188 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1189 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1191
1192 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1193 .set_compression(self.codec)
1194 .set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
1195 .set_page_encoding_stats(self.encoding_stats.clone())
1196 .set_total_compressed_size(total_compressed_size)
1197 .set_total_uncompressed_size(total_uncompressed_size)
1198 .set_num_values(num_values)
1199 .set_data_page_offset(data_page_offset)
1200 .set_dictionary_page_offset(dict_page_offset);
1201
1202 if self.statistics_enabled != EnabledStatistics::None {
1203 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1204
1205 let statistics = ValueStatistics::<E::T>::new(
1206 self.column_metrics.min_column_value.clone(),
1207 self.column_metrics.max_column_value.clone(),
1208 self.column_metrics.column_distinct_count,
1209 Some(self.column_metrics.num_column_nulls),
1210 false,
1211 )
1212 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1213 .into();
1214
1215 let statistics = self.truncate_statistics(statistics);
1216
1217 builder = builder
1218 .set_statistics(statistics)
1219 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1220 .set_repetition_level_histogram(
1221 self.column_metrics.repetition_level_histogram.take(),
1222 )
1223 .set_definition_level_histogram(
1224 self.column_metrics.definition_level_histogram.take(),
1225 );
1226
1227 if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
1228 builder = builder.set_geo_statistics(geo_stats);
1229 }
1230 }
1231
1232 builder = self.set_column_chunk_encryption_properties(builder);
1233
1234 let metadata = builder.build()?;
1235 Ok(metadata)
1236 }
1237
1238 #[inline]
1240 fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1241 let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1242 encoder.put(levels);
1243 encoder.consume()
1244 }
1245
1246 #[inline]
1249 fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1250 let mut encoder = LevelEncoder::v2(max_level, levels.len());
1251 encoder.put(levels);
1252 encoder.consume()
1253 }
1254
1255 #[inline]
1257 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1258 self.encodings.insert(page.encoding());
1259 match self.encoding_stats.last_mut() {
1260 Some(encoding_stats)
1261 if encoding_stats.page_type == page.page_type()
1262 && encoding_stats.encoding == page.encoding() =>
1263 {
1264 encoding_stats.count += 1;
1265 }
1266 _ => {
1267 self.encoding_stats.push(PageEncodingStats {
1270 page_type: page.page_type(),
1271 encoding: page.encoding(),
1272 count: 1,
1273 });
1274 }
1275 }
1276 let page_spec = self.page_writer.write_page(page)?;
1277 if let Some(builder) = self.offset_index_builder.as_mut() {
1280 builder
1281 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1282 }
1283 self.update_metrics_for_page(page_spec);
1284 Ok(())
1285 }
1286
1287 #[inline]
1289 fn write_dictionary_page(&mut self) -> Result<()> {
1290 let compressed_page = {
1291 let mut page = self
1292 .encoder
1293 .flush_dict_page()?
1294 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1295
1296 let uncompressed_size = page.buf.len();
1297
1298 if let Some(ref mut cmpr) = self.compressor {
1299 let mut output_buf = Vec::with_capacity(uncompressed_size);
1300 cmpr.compress(&page.buf, &mut output_buf)?;
1301 page.buf = Bytes::from(output_buf);
1302 }
1303
1304 let dict_page = Page::DictionaryPage {
1305 buf: page.buf,
1306 num_values: page.num_values as u32,
1307 encoding: self.props.dictionary_page_encoding(),
1308 is_sorted: page.is_sorted,
1309 };
1310 CompressedPage::new(dict_page, uncompressed_size)
1311 };
1312
1313 self.encodings.insert(compressed_page.encoding());
1314 self.encoding_stats.push(PageEncodingStats {
1315 page_type: PageType::DICTIONARY_PAGE,
1316 encoding: compressed_page.encoding(),
1317 count: 1,
1318 });
1319 let page_spec = self.page_writer.write_page(compressed_page)?;
1320 self.update_metrics_for_page(page_spec);
1321 Ok(())
1323 }
1324
1325 #[inline]
1327 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1328 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1329 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1330 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1331
1332 match page_spec.page_type {
1333 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1334 self.column_metrics.total_num_values += page_spec.num_values as u64;
1335 if self.column_metrics.data_page_offset.is_none() {
1336 self.column_metrics.data_page_offset = Some(page_spec.offset);
1337 }
1338 }
1339 PageType::DICTIONARY_PAGE => {
1340 assert!(
1341 self.column_metrics.dictionary_page_offset.is_none(),
1342 "Dictionary offset is already set"
1343 );
1344 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1345 }
1346 _ => {}
1347 }
1348 }
1349
1350 #[inline]
1351 #[cfg(feature = "encryption")]
1352 fn set_column_chunk_encryption_properties(
1353 &self,
1354 builder: ColumnChunkMetaDataBuilder,
1355 ) -> ColumnChunkMetaDataBuilder {
1356 if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1357 builder.set_column_crypto_metadata(get_column_crypto_metadata(
1358 encryption_properties,
1359 &self.descr,
1360 ))
1361 } else {
1362 builder
1363 }
1364 }
1365
1366 #[inline]
1367 #[cfg(not(feature = "encryption"))]
1368 fn set_column_chunk_encryption_properties(
1369 &self,
1370 builder: ColumnChunkMetaDataBuilder,
1371 ) -> ColumnChunkMetaDataBuilder {
1372 builder
1373 }
1374}
1375
1376fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1377 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1378}
1379
1380fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1381 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1382}
1383
1384#[inline]
1385#[allow(clippy::eq_op)]
1386fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1387 match T::PHYSICAL_TYPE {
1388 Type::FLOAT | Type::DOUBLE => val != val,
1389 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type_ref() == Some(&LogicalType::Float16) => {
1390 let val = val.as_bytes();
1391 let val = f16::from_le_bytes([val[0], val[1]]);
1392 val.is_nan()
1393 }
1394 _ => false,
1395 }
1396}
1397
1398fn update_stat<T: ParquetValueType, F>(
1403 descr: &ColumnDescriptor,
1404 val: &T,
1405 cur: &mut Option<T>,
1406 should_update: F,
1407) where
1408 F: Fn(&T) -> bool,
1409{
1410 if is_nan(descr, val) {
1411 return;
1412 }
1413
1414 if cur.as_ref().is_none_or(should_update) {
1415 *cur = Some(val.clone());
1416 }
1417}
1418
1419fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1421 match T::PHYSICAL_TYPE {
1422 Type::INT32 | Type::INT64 => {
1423 if let Some(LogicalType::Integer {
1424 is_signed: false, ..
1425 }) = descr.logical_type_ref()
1426 {
1427 return compare_greater_unsigned_int(a, b);
1429 }
1430
1431 match descr.converted_type() {
1432 ConvertedType::UINT_8
1433 | ConvertedType::UINT_16
1434 | ConvertedType::UINT_32
1435 | ConvertedType::UINT_64 => {
1436 return compare_greater_unsigned_int(a, b);
1437 }
1438 _ => {}
1439 };
1440 }
1441 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1442 if let Some(LogicalType::Decimal { .. }) = descr.logical_type_ref() {
1443 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1444 }
1445 if let ConvertedType::DECIMAL = descr.converted_type() {
1446 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1447 }
1448 if let Some(LogicalType::Float16) = descr.logical_type_ref() {
1449 return compare_greater_f16(a.as_bytes(), b.as_bytes());
1450 }
1451 }
1452
1453 _ => {}
1454 }
1455
1456 a > b
1458}
1459
1460fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1468 match (kind, props.writer_version()) {
1469 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1470 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1471 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1472 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1473 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1474 _ => Encoding::PLAIN,
1475 }
1476}
1477
1478fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1480 match (kind, props.writer_version()) {
1481 (Type::BOOLEAN, _) => false,
1483 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1485 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1486 _ => true,
1487 }
1488}
1489
1490#[inline]
1491fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1492 a.as_u64().unwrap() > b.as_u64().unwrap()
1493}
1494
1495#[inline]
1496fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1497 let a = f16::from_le_bytes(a.try_into().unwrap());
1498 let b = f16::from_le_bytes(b.try_into().unwrap());
1499 a > b
1500}
1501
1502fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1504 let a_length = a.len();
1505 let b_length = b.len();
1506
1507 if a_length == 0 || b_length == 0 {
1508 return a_length > 0;
1509 }
1510
1511 let first_a: u8 = a[0];
1512 let first_b: u8 = b[0];
1513
1514 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1519 return (first_a as i8) > (first_b as i8);
1520 }
1521
1522 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1528
1529 if a_length != b_length {
1530 let not_equal = if a_length > b_length {
1531 let lead_length = a_length - b_length;
1532 a[0..lead_length].iter().any(|&x| x != extension)
1533 } else {
1534 let lead_length = b_length - a_length;
1535 b[0..lead_length].iter().any(|&x| x != extension)
1536 };
1537
1538 if not_equal {
1539 let negative_values: bool = (first_a as i8) < 0;
1540 let a_longer: bool = a_length > b_length;
1541 return if negative_values { !a_longer } else { a_longer };
1542 }
1543 }
1544
1545 (a[1..]) > (b[1..])
1546}
1547
1548fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1554 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1555 Some(data.as_bytes()[..split].to_vec())
1556}
1557
1558fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1564 let lower_bound = length.saturating_sub(3);
1566 let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1567 increment_utf8(data.get(..split)?)
1568}
1569
1570fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1577 for (idx, original_char) in data.char_indices().rev() {
1578 let original_len = original_char.len_utf8();
1579 if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1580 if next_char.len_utf8() == original_len {
1582 let mut result = data.as_bytes()[..idx + original_len].to_vec();
1583 next_char.encode_utf8(&mut result[idx..]);
1584 return Some(result);
1585 }
1586 }
1587 }
1588
1589 None
1590}
1591
1592fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1596 for byte in data.iter_mut().rev() {
1597 let (incremented, overflow) = byte.overflowing_add(1);
1598 *byte = incremented;
1599
1600 if !overflow {
1601 return Some(data);
1602 }
1603 }
1604
1605 None
1606}
1607
1608#[cfg(test)]
1609mod tests {
1610 use crate::{
1611 file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1612 schema::parser::parse_message_type,
1613 };
1614 use core::str;
1615 use rand::distr::uniform::SampleUniform;
1616 use std::{fs::File, sync::Arc};
1617
1618 use crate::column::{
1619 page::PageReader,
1620 reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader},
1621 };
1622 use crate::file::writer::TrackedWrite;
1623 use crate::file::{
1624 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1625 };
1626 use crate::schema::types::{ColumnPath, Type as SchemaType};
1627 use crate::util::test_common::rand_gen::random_numbers_range;
1628
1629 use super::*;
1630
1631 #[test]
1632 fn test_column_writer_inconsistent_def_rep_length() {
1633 let page_writer = get_test_page_writer();
1634 let props = Default::default();
1635 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1636 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1637 assert!(res.is_err());
1638 if let Err(err) = res {
1639 assert_eq!(
1640 format!("{err}"),
1641 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1642 );
1643 }
1644 }
1645
1646 #[test]
1647 fn test_column_writer_invalid_def_levels() {
1648 let page_writer = get_test_page_writer();
1649 let props = Default::default();
1650 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1651 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1652 assert!(res.is_err());
1653 if let Err(err) = res {
1654 assert_eq!(
1655 format!("{err}"),
1656 "Parquet error: Definition levels are required, because max definition level = 1"
1657 );
1658 }
1659 }
1660
1661 #[test]
1662 fn test_column_writer_invalid_rep_levels() {
1663 let page_writer = get_test_page_writer();
1664 let props = Default::default();
1665 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1666 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1667 assert!(res.is_err());
1668 if let Err(err) = res {
1669 assert_eq!(
1670 format!("{err}"),
1671 "Parquet error: Repetition levels are required, because max repetition level = 1"
1672 );
1673 }
1674 }
1675
1676 #[test]
1677 fn test_column_writer_not_enough_values_to_write() {
1678 let page_writer = get_test_page_writer();
1679 let props = Default::default();
1680 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1681 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1682 assert!(res.is_err());
1683 if let Err(err) = res {
1684 assert_eq!(
1685 format!("{err}"),
1686 "Parquet error: Expected to write 4 values, but have only 2"
1687 );
1688 }
1689 }
1690
1691 #[test]
1692 fn test_column_writer_write_only_one_dictionary_page() {
1693 let page_writer = get_test_page_writer();
1694 let props = Default::default();
1695 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1696 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1697 writer.add_data_page().unwrap();
1699 writer.write_dictionary_page().unwrap();
1700 let err = writer.write_dictionary_page().unwrap_err().to_string();
1701 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1702 }
1703
1704 #[test]
1705 fn test_column_writer_error_when_writing_disabled_dictionary() {
1706 let page_writer = get_test_page_writer();
1707 let props = Arc::new(
1708 WriterProperties::builder()
1709 .set_dictionary_enabled(false)
1710 .build(),
1711 );
1712 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1713 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1714 let err = writer.write_dictionary_page().unwrap_err().to_string();
1715 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1716 }
1717
1718 #[test]
1719 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1720 let page_writer = get_test_page_writer();
1721 let props = Arc::new(
1722 WriterProperties::builder()
1723 .set_dictionary_enabled(true)
1724 .build(),
1725 );
1726 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1727 writer
1728 .write_batch(&[true, false, true, false], None, None)
1729 .unwrap();
1730
1731 let r = writer.close().unwrap();
1732 assert_eq!(r.bytes_written, 1);
1735 assert_eq!(r.rows_written, 4);
1736
1737 let metadata = r.metadata;
1738 assert_eq!(
1739 metadata.encodings().collect::<Vec<_>>(),
1740 vec![Encoding::PLAIN, Encoding::RLE]
1741 );
1742 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
1744 }
1745
1746 #[test]
1747 fn test_column_writer_default_encoding_support_bool() {
1748 check_encoding_write_support::<BoolType>(
1749 WriterVersion::PARQUET_1_0,
1750 true,
1751 &[true, false],
1752 None,
1753 &[Encoding::PLAIN, Encoding::RLE],
1754 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1755 );
1756 check_encoding_write_support::<BoolType>(
1757 WriterVersion::PARQUET_1_0,
1758 false,
1759 &[true, false],
1760 None,
1761 &[Encoding::PLAIN, Encoding::RLE],
1762 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1763 );
1764 check_encoding_write_support::<BoolType>(
1765 WriterVersion::PARQUET_2_0,
1766 true,
1767 &[true, false],
1768 None,
1769 &[Encoding::RLE],
1770 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1771 );
1772 check_encoding_write_support::<BoolType>(
1773 WriterVersion::PARQUET_2_0,
1774 false,
1775 &[true, false],
1776 None,
1777 &[Encoding::RLE],
1778 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1779 );
1780 }
1781
1782 #[test]
1783 fn test_column_writer_default_encoding_support_int32() {
1784 check_encoding_write_support::<Int32Type>(
1785 WriterVersion::PARQUET_1_0,
1786 true,
1787 &[1, 2],
1788 Some(0),
1789 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1790 &[
1791 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1792 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1793 ],
1794 );
1795 check_encoding_write_support::<Int32Type>(
1796 WriterVersion::PARQUET_1_0,
1797 false,
1798 &[1, 2],
1799 None,
1800 &[Encoding::PLAIN, Encoding::RLE],
1801 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1802 );
1803 check_encoding_write_support::<Int32Type>(
1804 WriterVersion::PARQUET_2_0,
1805 true,
1806 &[1, 2],
1807 Some(0),
1808 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1809 &[
1810 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1811 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1812 ],
1813 );
1814 check_encoding_write_support::<Int32Type>(
1815 WriterVersion::PARQUET_2_0,
1816 false,
1817 &[1, 2],
1818 None,
1819 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1820 &[encoding_stats(
1821 PageType::DATA_PAGE_V2,
1822 Encoding::DELTA_BINARY_PACKED,
1823 1,
1824 )],
1825 );
1826 }
1827
1828 #[test]
1829 fn test_column_writer_default_encoding_support_int64() {
1830 check_encoding_write_support::<Int64Type>(
1831 WriterVersion::PARQUET_1_0,
1832 true,
1833 &[1, 2],
1834 Some(0),
1835 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1836 &[
1837 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1838 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1839 ],
1840 );
1841 check_encoding_write_support::<Int64Type>(
1842 WriterVersion::PARQUET_1_0,
1843 false,
1844 &[1, 2],
1845 None,
1846 &[Encoding::PLAIN, Encoding::RLE],
1847 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1848 );
1849 check_encoding_write_support::<Int64Type>(
1850 WriterVersion::PARQUET_2_0,
1851 true,
1852 &[1, 2],
1853 Some(0),
1854 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1855 &[
1856 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1857 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1858 ],
1859 );
1860 check_encoding_write_support::<Int64Type>(
1861 WriterVersion::PARQUET_2_0,
1862 false,
1863 &[1, 2],
1864 None,
1865 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1866 &[encoding_stats(
1867 PageType::DATA_PAGE_V2,
1868 Encoding::DELTA_BINARY_PACKED,
1869 1,
1870 )],
1871 );
1872 }
1873
1874 #[test]
1875 fn test_column_writer_default_encoding_support_int96() {
1876 check_encoding_write_support::<Int96Type>(
1877 WriterVersion::PARQUET_1_0,
1878 true,
1879 &[Int96::from(vec![1, 2, 3])],
1880 Some(0),
1881 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1882 &[
1883 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1884 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1885 ],
1886 );
1887 check_encoding_write_support::<Int96Type>(
1888 WriterVersion::PARQUET_1_0,
1889 false,
1890 &[Int96::from(vec![1, 2, 3])],
1891 None,
1892 &[Encoding::PLAIN, Encoding::RLE],
1893 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1894 );
1895 check_encoding_write_support::<Int96Type>(
1896 WriterVersion::PARQUET_2_0,
1897 true,
1898 &[Int96::from(vec![1, 2, 3])],
1899 Some(0),
1900 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1901 &[
1902 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1903 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1904 ],
1905 );
1906 check_encoding_write_support::<Int96Type>(
1907 WriterVersion::PARQUET_2_0,
1908 false,
1909 &[Int96::from(vec![1, 2, 3])],
1910 None,
1911 &[Encoding::PLAIN, Encoding::RLE],
1912 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1913 );
1914 }
1915
1916 #[test]
1917 fn test_column_writer_default_encoding_support_float() {
1918 check_encoding_write_support::<FloatType>(
1919 WriterVersion::PARQUET_1_0,
1920 true,
1921 &[1.0, 2.0],
1922 Some(0),
1923 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1924 &[
1925 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1926 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1927 ],
1928 );
1929 check_encoding_write_support::<FloatType>(
1930 WriterVersion::PARQUET_1_0,
1931 false,
1932 &[1.0, 2.0],
1933 None,
1934 &[Encoding::PLAIN, Encoding::RLE],
1935 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1936 );
1937 check_encoding_write_support::<FloatType>(
1938 WriterVersion::PARQUET_2_0,
1939 true,
1940 &[1.0, 2.0],
1941 Some(0),
1942 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1943 &[
1944 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1945 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1946 ],
1947 );
1948 check_encoding_write_support::<FloatType>(
1949 WriterVersion::PARQUET_2_0,
1950 false,
1951 &[1.0, 2.0],
1952 None,
1953 &[Encoding::PLAIN, Encoding::RLE],
1954 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1955 );
1956 }
1957
1958 #[test]
1959 fn test_column_writer_default_encoding_support_double() {
1960 check_encoding_write_support::<DoubleType>(
1961 WriterVersion::PARQUET_1_0,
1962 true,
1963 &[1.0, 2.0],
1964 Some(0),
1965 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1966 &[
1967 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1968 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1969 ],
1970 );
1971 check_encoding_write_support::<DoubleType>(
1972 WriterVersion::PARQUET_1_0,
1973 false,
1974 &[1.0, 2.0],
1975 None,
1976 &[Encoding::PLAIN, Encoding::RLE],
1977 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1978 );
1979 check_encoding_write_support::<DoubleType>(
1980 WriterVersion::PARQUET_2_0,
1981 true,
1982 &[1.0, 2.0],
1983 Some(0),
1984 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1985 &[
1986 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1987 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1988 ],
1989 );
1990 check_encoding_write_support::<DoubleType>(
1991 WriterVersion::PARQUET_2_0,
1992 false,
1993 &[1.0, 2.0],
1994 None,
1995 &[Encoding::PLAIN, Encoding::RLE],
1996 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1997 );
1998 }
1999
2000 #[test]
2001 fn test_column_writer_default_encoding_support_byte_array() {
2002 check_encoding_write_support::<ByteArrayType>(
2003 WriterVersion::PARQUET_1_0,
2004 true,
2005 &[ByteArray::from(vec![1u8])],
2006 Some(0),
2007 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2008 &[
2009 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2010 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
2011 ],
2012 );
2013 check_encoding_write_support::<ByteArrayType>(
2014 WriterVersion::PARQUET_1_0,
2015 false,
2016 &[ByteArray::from(vec![1u8])],
2017 None,
2018 &[Encoding::PLAIN, Encoding::RLE],
2019 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2020 );
2021 check_encoding_write_support::<ByteArrayType>(
2022 WriterVersion::PARQUET_2_0,
2023 true,
2024 &[ByteArray::from(vec![1u8])],
2025 Some(0),
2026 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2027 &[
2028 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2029 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2030 ],
2031 );
2032 check_encoding_write_support::<ByteArrayType>(
2033 WriterVersion::PARQUET_2_0,
2034 false,
2035 &[ByteArray::from(vec![1u8])],
2036 None,
2037 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2038 &[encoding_stats(
2039 PageType::DATA_PAGE_V2,
2040 Encoding::DELTA_BYTE_ARRAY,
2041 1,
2042 )],
2043 );
2044 }
2045
2046 #[test]
2047 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2048 check_encoding_write_support::<FixedLenByteArrayType>(
2049 WriterVersion::PARQUET_1_0,
2050 true,
2051 &[ByteArray::from(vec![1u8]).into()],
2052 None,
2053 &[Encoding::PLAIN, Encoding::RLE],
2054 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2055 );
2056 check_encoding_write_support::<FixedLenByteArrayType>(
2057 WriterVersion::PARQUET_1_0,
2058 false,
2059 &[ByteArray::from(vec![1u8]).into()],
2060 None,
2061 &[Encoding::PLAIN, Encoding::RLE],
2062 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2063 );
2064 check_encoding_write_support::<FixedLenByteArrayType>(
2065 WriterVersion::PARQUET_2_0,
2066 true,
2067 &[ByteArray::from(vec![1u8]).into()],
2068 Some(0),
2069 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2070 &[
2071 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2072 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2073 ],
2074 );
2075 check_encoding_write_support::<FixedLenByteArrayType>(
2076 WriterVersion::PARQUET_2_0,
2077 false,
2078 &[ByteArray::from(vec![1u8]).into()],
2079 None,
2080 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2081 &[encoding_stats(
2082 PageType::DATA_PAGE_V2,
2083 Encoding::DELTA_BYTE_ARRAY,
2084 1,
2085 )],
2086 );
2087 }
2088
2089 #[test]
2090 fn test_column_writer_check_metadata() {
2091 let page_writer = get_test_page_writer();
2092 let props = Default::default();
2093 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2094 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2095
2096 let r = writer.close().unwrap();
2097 assert_eq!(r.bytes_written, 20);
2098 assert_eq!(r.rows_written, 4);
2099
2100 let metadata = r.metadata;
2101 assert_eq!(
2102 metadata.encodings().collect::<Vec<_>>(),
2103 vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2104 );
2105 assert_eq!(metadata.num_values(), 4);
2106 assert_eq!(metadata.compressed_size(), 20);
2107 assert_eq!(metadata.uncompressed_size(), 20);
2108 assert_eq!(metadata.data_page_offset(), 0);
2109 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2110 if let Some(stats) = metadata.statistics() {
2111 assert_eq!(stats.null_count_opt(), Some(0));
2112 assert_eq!(stats.distinct_count_opt(), None);
2113 if let Statistics::Int32(stats) = stats {
2114 assert_eq!(stats.min_opt().unwrap(), &1);
2115 assert_eq!(stats.max_opt().unwrap(), &4);
2116 } else {
2117 panic!("expecting Statistics::Int32");
2118 }
2119 } else {
2120 panic!("metadata missing statistics");
2121 }
2122 }
2123
2124 #[test]
2125 fn test_column_writer_check_byte_array_min_max() {
2126 let page_writer = get_test_page_writer();
2127 let props = Default::default();
2128 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2129 writer
2130 .write_batch(
2131 &[
2132 ByteArray::from(vec![
2133 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2134 35u8, 231u8, 90u8, 0u8, 0u8,
2135 ]),
2136 ByteArray::from(vec![
2137 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2138 152u8, 177u8, 56u8, 0u8, 0u8,
2139 ]),
2140 ByteArray::from(vec![
2141 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2142 0u8,
2143 ]),
2144 ByteArray::from(vec![
2145 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2146 44u8, 0u8, 0u8,
2147 ]),
2148 ],
2149 None,
2150 None,
2151 )
2152 .unwrap();
2153 let metadata = writer.close().unwrap().metadata;
2154 if let Some(stats) = metadata.statistics() {
2155 if let Statistics::ByteArray(stats) = stats {
2156 assert_eq!(
2157 stats.min_opt().unwrap(),
2158 &ByteArray::from(vec![
2159 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2160 35u8, 231u8, 90u8, 0u8, 0u8,
2161 ])
2162 );
2163 assert_eq!(
2164 stats.max_opt().unwrap(),
2165 &ByteArray::from(vec![
2166 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2167 44u8, 0u8, 0u8,
2168 ])
2169 );
2170 } else {
2171 panic!("expecting Statistics::ByteArray");
2172 }
2173 } else {
2174 panic!("metadata missing statistics");
2175 }
2176 }
2177
2178 #[test]
2179 fn test_column_writer_uint32_converted_type_min_max() {
2180 let page_writer = get_test_page_writer();
2181 let props = Default::default();
2182 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2183 page_writer,
2184 0,
2185 0,
2186 props,
2187 );
2188 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2189 let metadata = writer.close().unwrap().metadata;
2190 if let Some(stats) = metadata.statistics() {
2191 if let Statistics::Int32(stats) = stats {
2192 assert_eq!(stats.min_opt().unwrap(), &0,);
2193 assert_eq!(stats.max_opt().unwrap(), &5,);
2194 } else {
2195 panic!("expecting Statistics::Int32");
2196 }
2197 } else {
2198 panic!("metadata missing statistics");
2199 }
2200 }
2201
2202 #[test]
2203 fn test_column_writer_precalculated_statistics() {
2204 let page_writer = get_test_page_writer();
2205 let props = Arc::new(
2206 WriterProperties::builder()
2207 .set_statistics_enabled(EnabledStatistics::Chunk)
2208 .build(),
2209 );
2210 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2211 writer
2212 .write_batch_with_statistics(
2213 &[1, 2, 3, 4],
2214 None,
2215 None,
2216 Some(&-17),
2217 Some(&9000),
2218 Some(55),
2219 )
2220 .unwrap();
2221
2222 let r = writer.close().unwrap();
2223 assert_eq!(r.bytes_written, 20);
2224 assert_eq!(r.rows_written, 4);
2225
2226 let metadata = r.metadata;
2227 assert_eq!(
2228 metadata.encodings().collect::<Vec<_>>(),
2229 vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2230 );
2231 assert_eq!(metadata.num_values(), 4);
2232 assert_eq!(metadata.compressed_size(), 20);
2233 assert_eq!(metadata.uncompressed_size(), 20);
2234 assert_eq!(metadata.data_page_offset(), 0);
2235 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2236 if let Some(stats) = metadata.statistics() {
2237 assert_eq!(stats.null_count_opt(), Some(0));
2238 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2239 if let Statistics::Int32(stats) = stats {
2240 assert_eq!(stats.min_opt().unwrap(), &-17);
2241 assert_eq!(stats.max_opt().unwrap(), &9000);
2242 } else {
2243 panic!("expecting Statistics::Int32");
2244 }
2245 } else {
2246 panic!("metadata missing statistics");
2247 }
2248 }
2249
2250 #[test]
2251 fn test_mixed_precomputed_statistics() {
2252 let mut buf = Vec::with_capacity(100);
2253 let mut write = TrackedWrite::new(&mut buf);
2254 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2255 let props = Arc::new(
2256 WriterProperties::builder()
2257 .set_write_page_header_statistics(true)
2258 .build(),
2259 );
2260 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2261
2262 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2263 writer
2264 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2265 .unwrap();
2266
2267 let r = writer.close().unwrap();
2268
2269 let stats = r.metadata.statistics().unwrap();
2270 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2271 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2272 assert_eq!(stats.null_count_opt(), Some(0));
2273 assert!(stats.distinct_count_opt().is_none());
2274
2275 drop(write);
2276
2277 let props = ReaderProperties::builder()
2278 .set_backward_compatible_lz4(false)
2279 .set_read_page_statistics(true)
2280 .build();
2281 let reader = SerializedPageReader::new_with_properties(
2282 Arc::new(Bytes::from(buf)),
2283 &r.metadata,
2284 r.rows_written as usize,
2285 None,
2286 Arc::new(props),
2287 )
2288 .unwrap();
2289
2290 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2291 assert_eq!(pages.len(), 2);
2292
2293 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2294 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2295
2296 let page_statistics = pages[1].statistics().unwrap();
2297 assert_eq!(
2298 page_statistics.min_bytes_opt().unwrap(),
2299 1_i32.to_le_bytes()
2300 );
2301 assert_eq!(
2302 page_statistics.max_bytes_opt().unwrap(),
2303 7_i32.to_le_bytes()
2304 );
2305 assert_eq!(page_statistics.null_count_opt(), Some(0));
2306 assert!(page_statistics.distinct_count_opt().is_none());
2307 }
2308
2309 #[test]
2310 fn test_disabled_statistics() {
2311 let mut buf = Vec::with_capacity(100);
2312 let mut write = TrackedWrite::new(&mut buf);
2313 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2314 let props = WriterProperties::builder()
2315 .set_statistics_enabled(EnabledStatistics::None)
2316 .set_writer_version(WriterVersion::PARQUET_2_0)
2317 .build();
2318 let props = Arc::new(props);
2319
2320 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2321 writer
2322 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2323 .unwrap();
2324
2325 let r = writer.close().unwrap();
2326 assert!(r.metadata.statistics().is_none());
2327
2328 drop(write);
2329
2330 let props = ReaderProperties::builder()
2331 .set_backward_compatible_lz4(false)
2332 .build();
2333 let reader = SerializedPageReader::new_with_properties(
2334 Arc::new(Bytes::from(buf)),
2335 &r.metadata,
2336 r.rows_written as usize,
2337 None,
2338 Arc::new(props),
2339 )
2340 .unwrap();
2341
2342 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2343 assert_eq!(pages.len(), 2);
2344
2345 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2346 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2347
2348 match &pages[1] {
2349 Page::DataPageV2 {
2350 num_values,
2351 num_nulls,
2352 num_rows,
2353 statistics,
2354 ..
2355 } => {
2356 assert_eq!(*num_values, 6);
2357 assert_eq!(*num_nulls, 2);
2358 assert_eq!(*num_rows, 6);
2359 assert!(statistics.is_none());
2360 }
2361 _ => unreachable!(),
2362 }
2363 }
2364
2365 #[test]
2366 fn test_column_writer_empty_column_roundtrip() {
2367 let props = Default::default();
2368 column_roundtrip::<Int32Type>(props, &[], None, None);
2369 }
2370
2371 #[test]
2372 fn test_column_writer_non_nullable_values_roundtrip() {
2373 let props = Default::default();
2374 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2375 }
2376
2377 #[test]
2378 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2379 let props = Default::default();
2380 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2381 }
2382
2383 #[test]
2384 fn test_column_writer_nullable_repeated_values_roundtrip() {
2385 let props = Default::default();
2386 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2387 }
2388
2389 #[test]
2390 fn test_column_writer_dictionary_fallback_small_data_page() {
2391 let props = WriterProperties::builder()
2392 .set_dictionary_page_size_limit(32)
2393 .set_data_page_size_limit(32)
2394 .build();
2395 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2396 }
2397
2398 #[test]
2399 fn test_column_writer_small_write_batch_size() {
2400 for i in &[1usize, 2, 5, 10, 11, 1023] {
2401 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2402
2403 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2404 }
2405 }
2406
2407 #[test]
2408 fn test_column_writer_dictionary_disabled_v1() {
2409 let props = WriterProperties::builder()
2410 .set_writer_version(WriterVersion::PARQUET_1_0)
2411 .set_dictionary_enabled(false)
2412 .build();
2413 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2414 }
2415
2416 #[test]
2417 fn test_column_writer_dictionary_disabled_v2() {
2418 let props = WriterProperties::builder()
2419 .set_writer_version(WriterVersion::PARQUET_2_0)
2420 .set_dictionary_enabled(false)
2421 .build();
2422 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2423 }
2424
2425 #[test]
2426 fn test_column_writer_compression_v1() {
2427 let props = WriterProperties::builder()
2428 .set_writer_version(WriterVersion::PARQUET_1_0)
2429 .set_compression(Compression::SNAPPY)
2430 .build();
2431 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2432 }
2433
2434 #[test]
2435 fn test_column_writer_compression_v2() {
2436 let props = WriterProperties::builder()
2437 .set_writer_version(WriterVersion::PARQUET_2_0)
2438 .set_compression(Compression::SNAPPY)
2439 .build();
2440 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2441 }
2442
2443 #[test]
2444 fn test_column_writer_add_data_pages_with_dict() {
2445 let mut file = tempfile::tempfile().unwrap();
2448 let mut write = TrackedWrite::new(&mut file);
2449 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2450 let props = Arc::new(
2451 WriterProperties::builder()
2452 .set_data_page_size_limit(10)
2453 .set_write_batch_size(3) .build(),
2455 );
2456 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2457 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2458 writer.write_batch(data, None, None).unwrap();
2459 let r = writer.close().unwrap();
2460
2461 drop(write);
2462
2463 let props = ReaderProperties::builder()
2465 .set_backward_compatible_lz4(false)
2466 .build();
2467 let mut page_reader = Box::new(
2468 SerializedPageReader::new_with_properties(
2469 Arc::new(file),
2470 &r.metadata,
2471 r.rows_written as usize,
2472 None,
2473 Arc::new(props),
2474 )
2475 .unwrap(),
2476 );
2477 let mut res = Vec::new();
2478 while let Some(page) = page_reader.get_next_page().unwrap() {
2479 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2480 }
2481 assert_eq!(
2482 res,
2483 vec![
2484 (PageType::DICTIONARY_PAGE, 10, 40),
2485 (PageType::DATA_PAGE, 9, 10),
2486 (PageType::DATA_PAGE, 1, 3),
2487 ]
2488 );
2489 assert_eq!(
2490 r.metadata.page_encoding_stats(),
2491 Some(&vec![
2492 PageEncodingStats {
2493 page_type: PageType::DICTIONARY_PAGE,
2494 encoding: Encoding::PLAIN,
2495 count: 1
2496 },
2497 PageEncodingStats {
2498 page_type: PageType::DATA_PAGE,
2499 encoding: Encoding::RLE_DICTIONARY,
2500 count: 2,
2501 }
2502 ])
2503 );
2504 }
2505
2506 #[test]
2507 fn test_column_writer_column_data_page_size_limit() {
2508 let props = Arc::new(
2509 WriterProperties::builder()
2510 .set_writer_version(WriterVersion::PARQUET_1_0)
2511 .set_dictionary_enabled(false)
2512 .set_data_page_size_limit(1000)
2513 .set_column_data_page_size_limit(ColumnPath::from("col"), 10)
2514 .set_write_batch_size(3)
2515 .build(),
2516 );
2517 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2518
2519 let col_values =
2520 write_and_collect_page_values(ColumnPath::from("col"), Arc::clone(&props), data);
2521 let other_values = write_and_collect_page_values(ColumnPath::from("other"), props, data);
2522
2523 assert_eq!(col_values, vec![3, 3, 3, 1]);
2524 assert_eq!(other_values, vec![10]);
2525 }
2526
2527 #[test]
2528 fn test_bool_statistics() {
2529 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2530 assert!(!stats.is_min_max_backwards_compatible());
2533 if let Statistics::Boolean(stats) = stats {
2534 assert_eq!(stats.min_opt().unwrap(), &false);
2535 assert_eq!(stats.max_opt().unwrap(), &true);
2536 } else {
2537 panic!("expecting Statistics::Boolean, got {stats:?}");
2538 }
2539 }
2540
2541 #[test]
2542 fn test_int32_statistics() {
2543 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2544 assert!(stats.is_min_max_backwards_compatible());
2545 if let Statistics::Int32(stats) = stats {
2546 assert_eq!(stats.min_opt().unwrap(), &-2);
2547 assert_eq!(stats.max_opt().unwrap(), &3);
2548 } else {
2549 panic!("expecting Statistics::Int32, got {stats:?}");
2550 }
2551 }
2552
2553 #[test]
2554 fn test_int64_statistics() {
2555 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2556 assert!(stats.is_min_max_backwards_compatible());
2557 if let Statistics::Int64(stats) = stats {
2558 assert_eq!(stats.min_opt().unwrap(), &-2);
2559 assert_eq!(stats.max_opt().unwrap(), &3);
2560 } else {
2561 panic!("expecting Statistics::Int64, got {stats:?}");
2562 }
2563 }
2564
2565 #[test]
2566 fn test_int96_statistics() {
2567 let input = vec![
2568 Int96::from(vec![1, 20, 30]),
2569 Int96::from(vec![3, 20, 10]),
2570 Int96::from(vec![0, 20, 30]),
2571 Int96::from(vec![2, 20, 30]),
2572 ]
2573 .into_iter()
2574 .collect::<Vec<Int96>>();
2575
2576 let stats = statistics_roundtrip::<Int96Type>(&input);
2577 assert!(!stats.is_min_max_backwards_compatible());
2578 if let Statistics::Int96(stats) = stats {
2579 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2580 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
2581 } else {
2582 panic!("expecting Statistics::Int96, got {stats:?}");
2583 }
2584 }
2585
2586 #[test]
2587 fn test_float_statistics() {
2588 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2589 assert!(stats.is_min_max_backwards_compatible());
2590 if let Statistics::Float(stats) = stats {
2591 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2592 assert_eq!(stats.max_opt().unwrap(), &3.0);
2593 } else {
2594 panic!("expecting Statistics::Float, got {stats:?}");
2595 }
2596 }
2597
2598 #[test]
2599 fn test_double_statistics() {
2600 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2601 assert!(stats.is_min_max_backwards_compatible());
2602 if let Statistics::Double(stats) = stats {
2603 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2604 assert_eq!(stats.max_opt().unwrap(), &3.0);
2605 } else {
2606 panic!("expecting Statistics::Double, got {stats:?}");
2607 }
2608 }
2609
2610 #[test]
2611 fn test_byte_array_statistics() {
2612 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2613 .iter()
2614 .map(|&s| s.into())
2615 .collect::<Vec<_>>();
2616
2617 let stats = statistics_roundtrip::<ByteArrayType>(&input);
2618 assert!(!stats.is_min_max_backwards_compatible());
2619 if let Statistics::ByteArray(stats) = stats {
2620 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2621 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2622 } else {
2623 panic!("expecting Statistics::ByteArray, got {stats:?}");
2624 }
2625 }
2626
2627 #[test]
2628 fn test_fixed_len_byte_array_statistics() {
2629 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
2630 .iter()
2631 .map(|&s| ByteArray::from(s).into())
2632 .collect::<Vec<_>>();
2633
2634 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2635 assert!(!stats.is_min_max_backwards_compatible());
2636 if let Statistics::FixedLenByteArray(stats) = stats {
2637 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
2638 assert_eq!(stats.min_opt().unwrap(), &expected_min);
2639 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
2640 assert_eq!(stats.max_opt().unwrap(), &expected_max);
2641 } else {
2642 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2643 }
2644 }
2645
2646 #[test]
2647 fn test_column_writer_check_float16_min_max() {
2648 let input = [
2649 -f16::ONE,
2650 f16::from_f32(3.0),
2651 -f16::from_f32(2.0),
2652 f16::from_f32(2.0),
2653 ]
2654 .into_iter()
2655 .map(|s| ByteArray::from(s).into())
2656 .collect::<Vec<_>>();
2657
2658 let stats = float16_statistics_roundtrip(&input);
2659 assert!(stats.is_min_max_backwards_compatible());
2660 assert_eq!(
2661 stats.min_opt().unwrap(),
2662 &ByteArray::from(-f16::from_f32(2.0))
2663 );
2664 assert_eq!(
2665 stats.max_opt().unwrap(),
2666 &ByteArray::from(f16::from_f32(3.0))
2667 );
2668 }
2669
2670 #[test]
2671 fn test_column_writer_check_float16_nan_middle() {
2672 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2673 .into_iter()
2674 .map(|s| ByteArray::from(s).into())
2675 .collect::<Vec<_>>();
2676
2677 let stats = float16_statistics_roundtrip(&input);
2678 assert!(stats.is_min_max_backwards_compatible());
2679 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2680 assert_eq!(
2681 stats.max_opt().unwrap(),
2682 &ByteArray::from(f16::ONE + f16::ONE)
2683 );
2684 }
2685
2686 #[test]
2687 fn test_float16_statistics_nan_middle() {
2688 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2689 .into_iter()
2690 .map(|s| ByteArray::from(s).into())
2691 .collect::<Vec<_>>();
2692
2693 let stats = float16_statistics_roundtrip(&input);
2694 assert!(stats.is_min_max_backwards_compatible());
2695 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2696 assert_eq!(
2697 stats.max_opt().unwrap(),
2698 &ByteArray::from(f16::ONE + f16::ONE)
2699 );
2700 }
2701
2702 #[test]
2703 fn test_float16_statistics_nan_start() {
2704 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2705 .into_iter()
2706 .map(|s| ByteArray::from(s).into())
2707 .collect::<Vec<_>>();
2708
2709 let stats = float16_statistics_roundtrip(&input);
2710 assert!(stats.is_min_max_backwards_compatible());
2711 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2712 assert_eq!(
2713 stats.max_opt().unwrap(),
2714 &ByteArray::from(f16::ONE + f16::ONE)
2715 );
2716 }
2717
2718 #[test]
2719 fn test_float16_statistics_nan_only() {
2720 let input = [f16::NAN, f16::NAN]
2721 .into_iter()
2722 .map(|s| ByteArray::from(s).into())
2723 .collect::<Vec<_>>();
2724
2725 let stats = float16_statistics_roundtrip(&input);
2726 assert!(stats.min_bytes_opt().is_none());
2727 assert!(stats.max_bytes_opt().is_none());
2728 assert!(stats.is_min_max_backwards_compatible());
2729 }
2730
2731 #[test]
2732 fn test_float16_statistics_zero_only() {
2733 let input = [f16::ZERO]
2734 .into_iter()
2735 .map(|s| ByteArray::from(s).into())
2736 .collect::<Vec<_>>();
2737
2738 let stats = float16_statistics_roundtrip(&input);
2739 assert!(stats.is_min_max_backwards_compatible());
2740 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2741 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2742 }
2743
2744 #[test]
2745 fn test_float16_statistics_neg_zero_only() {
2746 let input = [f16::NEG_ZERO]
2747 .into_iter()
2748 .map(|s| ByteArray::from(s).into())
2749 .collect::<Vec<_>>();
2750
2751 let stats = float16_statistics_roundtrip(&input);
2752 assert!(stats.is_min_max_backwards_compatible());
2753 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2754 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2755 }
2756
2757 #[test]
2758 fn test_float16_statistics_zero_min() {
2759 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2760 .into_iter()
2761 .map(|s| ByteArray::from(s).into())
2762 .collect::<Vec<_>>();
2763
2764 let stats = float16_statistics_roundtrip(&input);
2765 assert!(stats.is_min_max_backwards_compatible());
2766 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2767 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2768 }
2769
2770 #[test]
2771 fn test_float16_statistics_neg_zero_max() {
2772 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2773 .into_iter()
2774 .map(|s| ByteArray::from(s).into())
2775 .collect::<Vec<_>>();
2776
2777 let stats = float16_statistics_roundtrip(&input);
2778 assert!(stats.is_min_max_backwards_compatible());
2779 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2780 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2781 }
2782
2783 #[test]
2784 fn test_float_statistics_nan_middle() {
2785 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2786 assert!(stats.is_min_max_backwards_compatible());
2787 if let Statistics::Float(stats) = stats {
2788 assert_eq!(stats.min_opt().unwrap(), &1.0);
2789 assert_eq!(stats.max_opt().unwrap(), &2.0);
2790 } else {
2791 panic!("expecting Statistics::Float");
2792 }
2793 }
2794
2795 #[test]
2796 fn test_float_statistics_nan_start() {
2797 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2798 assert!(stats.is_min_max_backwards_compatible());
2799 if let Statistics::Float(stats) = stats {
2800 assert_eq!(stats.min_opt().unwrap(), &1.0);
2801 assert_eq!(stats.max_opt().unwrap(), &2.0);
2802 } else {
2803 panic!("expecting Statistics::Float");
2804 }
2805 }
2806
2807 #[test]
2808 fn test_float_statistics_nan_only() {
2809 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2810 assert!(stats.min_bytes_opt().is_none());
2811 assert!(stats.max_bytes_opt().is_none());
2812 assert!(stats.is_min_max_backwards_compatible());
2813 assert!(matches!(stats, Statistics::Float(_)));
2814 }
2815
2816 #[test]
2817 fn test_float_statistics_zero_only() {
2818 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2819 assert!(stats.is_min_max_backwards_compatible());
2820 if let Statistics::Float(stats) = stats {
2821 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2822 assert!(stats.min_opt().unwrap().is_sign_negative());
2823 assert_eq!(stats.max_opt().unwrap(), &0.0);
2824 assert!(stats.max_opt().unwrap().is_sign_positive());
2825 } else {
2826 panic!("expecting Statistics::Float");
2827 }
2828 }
2829
2830 #[test]
2831 fn test_float_statistics_neg_zero_only() {
2832 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2833 assert!(stats.is_min_max_backwards_compatible());
2834 if let Statistics::Float(stats) = stats {
2835 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2836 assert!(stats.min_opt().unwrap().is_sign_negative());
2837 assert_eq!(stats.max_opt().unwrap(), &0.0);
2838 assert!(stats.max_opt().unwrap().is_sign_positive());
2839 } else {
2840 panic!("expecting Statistics::Float");
2841 }
2842 }
2843
2844 #[test]
2845 fn test_float_statistics_zero_min() {
2846 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2847 assert!(stats.is_min_max_backwards_compatible());
2848 if let Statistics::Float(stats) = stats {
2849 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2850 assert!(stats.min_opt().unwrap().is_sign_negative());
2851 assert_eq!(stats.max_opt().unwrap(), &2.0);
2852 } else {
2853 panic!("expecting Statistics::Float");
2854 }
2855 }
2856
2857 #[test]
2858 fn test_float_statistics_neg_zero_max() {
2859 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2860 assert!(stats.is_min_max_backwards_compatible());
2861 if let Statistics::Float(stats) = stats {
2862 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2863 assert_eq!(stats.max_opt().unwrap(), &0.0);
2864 assert!(stats.max_opt().unwrap().is_sign_positive());
2865 } else {
2866 panic!("expecting Statistics::Float");
2867 }
2868 }
2869
2870 #[test]
2871 fn test_double_statistics_nan_middle() {
2872 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2873 assert!(stats.is_min_max_backwards_compatible());
2874 if let Statistics::Double(stats) = stats {
2875 assert_eq!(stats.min_opt().unwrap(), &1.0);
2876 assert_eq!(stats.max_opt().unwrap(), &2.0);
2877 } else {
2878 panic!("expecting Statistics::Double");
2879 }
2880 }
2881
2882 #[test]
2883 fn test_double_statistics_nan_start() {
2884 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2885 assert!(stats.is_min_max_backwards_compatible());
2886 if let Statistics::Double(stats) = stats {
2887 assert_eq!(stats.min_opt().unwrap(), &1.0);
2888 assert_eq!(stats.max_opt().unwrap(), &2.0);
2889 } else {
2890 panic!("expecting Statistics::Double");
2891 }
2892 }
2893
2894 #[test]
2895 fn test_double_statistics_nan_only() {
2896 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2897 assert!(stats.min_bytes_opt().is_none());
2898 assert!(stats.max_bytes_opt().is_none());
2899 assert!(matches!(stats, Statistics::Double(_)));
2900 assert!(stats.is_min_max_backwards_compatible());
2901 }
2902
2903 #[test]
2904 fn test_double_statistics_zero_only() {
2905 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2906 assert!(stats.is_min_max_backwards_compatible());
2907 if let Statistics::Double(stats) = stats {
2908 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2909 assert!(stats.min_opt().unwrap().is_sign_negative());
2910 assert_eq!(stats.max_opt().unwrap(), &0.0);
2911 assert!(stats.max_opt().unwrap().is_sign_positive());
2912 } else {
2913 panic!("expecting Statistics::Double");
2914 }
2915 }
2916
2917 #[test]
2918 fn test_double_statistics_neg_zero_only() {
2919 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2920 assert!(stats.is_min_max_backwards_compatible());
2921 if let Statistics::Double(stats) = stats {
2922 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2923 assert!(stats.min_opt().unwrap().is_sign_negative());
2924 assert_eq!(stats.max_opt().unwrap(), &0.0);
2925 assert!(stats.max_opt().unwrap().is_sign_positive());
2926 } else {
2927 panic!("expecting Statistics::Double");
2928 }
2929 }
2930
2931 #[test]
2932 fn test_double_statistics_zero_min() {
2933 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2934 assert!(stats.is_min_max_backwards_compatible());
2935 if let Statistics::Double(stats) = stats {
2936 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2937 assert!(stats.min_opt().unwrap().is_sign_negative());
2938 assert_eq!(stats.max_opt().unwrap(), &2.0);
2939 } else {
2940 panic!("expecting Statistics::Double");
2941 }
2942 }
2943
2944 #[test]
2945 fn test_double_statistics_neg_zero_max() {
2946 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2947 assert!(stats.is_min_max_backwards_compatible());
2948 if let Statistics::Double(stats) = stats {
2949 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2950 assert_eq!(stats.max_opt().unwrap(), &0.0);
2951 assert!(stats.max_opt().unwrap().is_sign_positive());
2952 } else {
2953 panic!("expecting Statistics::Double");
2954 }
2955 }
2956
2957 #[test]
2958 fn test_compare_greater_byte_array_decimals() {
2959 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2960 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2961 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2962 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2963 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2964 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2965 assert!(!compare_greater_byte_array_decimals(
2966 &[0u8, 1u8,],
2967 &[1u8, 0u8,],
2968 ),);
2969 assert!(!compare_greater_byte_array_decimals(
2970 &[255u8, 35u8, 0u8, 0u8,],
2971 &[0u8,],
2972 ),);
2973 assert!(compare_greater_byte_array_decimals(
2974 &[0u8,],
2975 &[255u8, 35u8, 0u8, 0u8,],
2976 ),);
2977 }
2978
2979 #[test]
2980 fn test_column_index_with_null_pages() {
2981 let page_writer = get_test_page_writer();
2983 let props = Default::default();
2984 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2985 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2986
2987 let r = writer.close().unwrap();
2988 assert!(r.column_index.is_some());
2989 let col_idx = r.column_index.unwrap();
2990 let col_idx = match col_idx {
2991 ColumnIndexMetaData::INT32(col_idx) => col_idx,
2992 _ => panic!("wrong stats type"),
2993 };
2994 assert!(col_idx.is_null_page(0));
2996 assert!(col_idx.min_value(0).is_none());
2998 assert!(col_idx.max_value(0).is_none());
2999 assert!(col_idx.null_count(0).is_some());
3001 assert_eq!(col_idx.null_count(0), Some(4));
3002 assert!(col_idx.repetition_level_histogram(0).is_none());
3004 assert!(col_idx.definition_level_histogram(0).is_some());
3006 assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
3007 }
3008
3009 #[test]
3010 fn test_column_offset_index_metadata() {
3011 let page_writer = get_test_page_writer();
3014 let props = Default::default();
3015 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3016 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3017 writer.flush_data_pages().unwrap();
3019 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
3021
3022 let r = writer.close().unwrap();
3023 let column_index = r.column_index.unwrap();
3024 let offset_index = r.offset_index.unwrap();
3025
3026 assert_eq!(8, r.rows_written);
3027
3028 let column_index = match column_index {
3030 ColumnIndexMetaData::INT32(column_index) => column_index,
3031 _ => panic!("wrong stats type"),
3032 };
3033 assert_eq!(2, column_index.num_pages());
3034 assert_eq!(2, offset_index.page_locations.len());
3035 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
3036 for idx in 0..2 {
3037 assert!(!column_index.is_null_page(idx));
3038 assert_eq!(0, column_index.null_count(0).unwrap());
3039 }
3040
3041 if let Some(stats) = r.metadata.statistics() {
3042 assert_eq!(stats.null_count_opt(), Some(0));
3043 assert_eq!(stats.distinct_count_opt(), None);
3044 if let Statistics::Int32(stats) = stats {
3045 assert_eq!(stats.min_opt(), column_index.min_value(1));
3049 assert_eq!(stats.max_opt(), column_index.max_value(1));
3050 } else {
3051 panic!("expecting Statistics::Int32");
3052 }
3053 } else {
3054 panic!("metadata missing statistics");
3055 }
3056
3057 assert_eq!(0, offset_index.page_locations[0].first_row_index);
3059 assert_eq!(4, offset_index.page_locations[1].first_row_index);
3060 }
3061
3062 #[test]
3064 fn test_column_offset_index_metadata_truncating() {
3065 let page_writer = get_test_page_writer();
3068 let props = WriterProperties::builder()
3069 .set_statistics_truncate_length(None) .build()
3071 .into();
3072 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3073
3074 let mut data = vec![FixedLenByteArray::default(); 3];
3075 data[0].set_data(Bytes::from(vec![97_u8; 200]));
3077 data[1].set_data(Bytes::from(vec![112_u8; 200]));
3079 data[2].set_data(Bytes::from(vec![98_u8; 200]));
3080
3081 writer.write_batch(&data, None, None).unwrap();
3082
3083 writer.flush_data_pages().unwrap();
3084
3085 let r = writer.close().unwrap();
3086 let column_index = r.column_index.unwrap();
3087 let offset_index = r.offset_index.unwrap();
3088
3089 let column_index = match column_index {
3090 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3091 _ => panic!("wrong stats type"),
3092 };
3093
3094 assert_eq!(3, r.rows_written);
3095
3096 assert_eq!(1, column_index.num_pages());
3098 assert_eq!(1, offset_index.page_locations.len());
3099 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3100 assert!(!column_index.is_null_page(0));
3101 assert_eq!(Some(0), column_index.null_count(0));
3102
3103 if let Some(stats) = r.metadata.statistics() {
3104 assert_eq!(stats.null_count_opt(), Some(0));
3105 assert_eq!(stats.distinct_count_opt(), None);
3106 if let Statistics::FixedLenByteArray(stats) = stats {
3107 let column_index_min_value = column_index.min_value(0).unwrap();
3108 let column_index_max_value = column_index.max_value(0).unwrap();
3109
3110 assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value);
3112 assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value);
3113
3114 assert_eq!(
3115 column_index_min_value.len(),
3116 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3117 );
3118 assert_eq!(column_index_min_value, &[97_u8; 64]);
3119 assert_eq!(
3120 column_index_max_value.len(),
3121 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3122 );
3123
3124 assert_eq!(
3126 *column_index_max_value.last().unwrap(),
3127 *column_index_max_value.first().unwrap() + 1
3128 );
3129 } else {
3130 panic!("expecting Statistics::FixedLenByteArray");
3131 }
3132 } else {
3133 panic!("metadata missing statistics");
3134 }
3135 }
3136
3137 #[test]
3138 fn test_column_offset_index_truncating_spec_example() {
3139 let page_writer = get_test_page_writer();
3142
3143 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3145 let props = Arc::new(builder.build());
3146 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3147
3148 let mut data = vec![FixedLenByteArray::default(); 1];
3149 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3151
3152 writer.write_batch(&data, None, None).unwrap();
3153
3154 writer.flush_data_pages().unwrap();
3155
3156 let r = writer.close().unwrap();
3157 let column_index = r.column_index.unwrap();
3158 let offset_index = r.offset_index.unwrap();
3159
3160 let column_index = match column_index {
3161 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3162 _ => panic!("wrong stats type"),
3163 };
3164
3165 assert_eq!(1, r.rows_written);
3166
3167 assert_eq!(1, column_index.num_pages());
3169 assert_eq!(1, offset_index.page_locations.len());
3170 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3171 assert!(!column_index.is_null_page(0));
3172 assert_eq!(Some(0), column_index.null_count(0));
3173
3174 if let Some(stats) = r.metadata.statistics() {
3175 assert_eq!(stats.null_count_opt(), Some(0));
3176 assert_eq!(stats.distinct_count_opt(), None);
3177 if let Statistics::FixedLenByteArray(_stats) = stats {
3178 let column_index_min_value = column_index.min_value(0).unwrap();
3179 let column_index_max_value = column_index.max_value(0).unwrap();
3180
3181 assert_eq!(column_index_min_value.len(), 1);
3182 assert_eq!(column_index_max_value.len(), 1);
3183
3184 assert_eq!("B".as_bytes(), column_index_min_value);
3185 assert_eq!("C".as_bytes(), column_index_max_value);
3186
3187 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3188 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3189 } else {
3190 panic!("expecting Statistics::FixedLenByteArray");
3191 }
3192 } else {
3193 panic!("metadata missing statistics");
3194 }
3195 }
3196
3197 #[test]
3198 fn test_float16_min_max_no_truncation() {
3199 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3201 let props = Arc::new(builder.build());
3202 let page_writer = get_test_page_writer();
3203 let mut writer = get_test_float16_column_writer(page_writer, props);
3204
3205 let expected_value = f16::PI.to_le_bytes().to_vec();
3206 let data = vec![ByteArray::from(expected_value.clone()).into()];
3207 writer.write_batch(&data, None, None).unwrap();
3208 writer.flush_data_pages().unwrap();
3209
3210 let r = writer.close().unwrap();
3211
3212 let column_index = r.column_index.unwrap();
3215 let column_index = match column_index {
3216 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3217 _ => panic!("wrong stats type"),
3218 };
3219 let column_index_min_bytes = column_index.min_value(0).unwrap();
3220 let column_index_max_bytes = column_index.max_value(0).unwrap();
3221 assert_eq!(expected_value, column_index_min_bytes);
3222 assert_eq!(expected_value, column_index_max_bytes);
3223
3224 let stats = r.metadata.statistics().unwrap();
3226 if let Statistics::FixedLenByteArray(stats) = stats {
3227 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3228 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3229 assert_eq!(expected_value, stats_min_bytes);
3230 assert_eq!(expected_value, stats_max_bytes);
3231 } else {
3232 panic!("expecting Statistics::FixedLenByteArray");
3233 }
3234 }
3235
3236 #[test]
3237 fn test_decimal_min_max_no_truncation() {
3238 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3240 let props = Arc::new(builder.build());
3241 let page_writer = get_test_page_writer();
3242 let mut writer =
3243 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3244
3245 let expected_value = vec![
3246 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3247 231u8, 90u8, 0u8, 0u8,
3248 ];
3249 let data = vec![ByteArray::from(expected_value.clone()).into()];
3250 writer.write_batch(&data, None, None).unwrap();
3251 writer.flush_data_pages().unwrap();
3252
3253 let r = writer.close().unwrap();
3254
3255 let column_index = r.column_index.unwrap();
3258 let column_index = match column_index {
3259 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3260 _ => panic!("wrong stats type"),
3261 };
3262 let column_index_min_bytes = column_index.min_value(0).unwrap();
3263 let column_index_max_bytes = column_index.max_value(0).unwrap();
3264 assert_eq!(expected_value, column_index_min_bytes);
3265 assert_eq!(expected_value, column_index_max_bytes);
3266
3267 let stats = r.metadata.statistics().unwrap();
3269 if let Statistics::FixedLenByteArray(stats) = stats {
3270 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3271 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3272 assert_eq!(expected_value, stats_min_bytes);
3273 assert_eq!(expected_value, stats_max_bytes);
3274 } else {
3275 panic!("expecting Statistics::FixedLenByteArray");
3276 }
3277 }
3278
3279 #[test]
3280 fn test_statistics_truncating_byte_array_default() {
3281 let page_writer = get_test_page_writer();
3282
3283 let props = WriterProperties::builder().build().into();
3285 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3286
3287 let mut data = vec![ByteArray::default(); 1];
3288 data[0].set_data(Bytes::from(String::from(
3289 "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3290 )));
3291 writer.write_batch(&data, None, None).unwrap();
3292 writer.flush_data_pages().unwrap();
3293
3294 let r = writer.close().unwrap();
3295
3296 assert_eq!(1, r.rows_written);
3297
3298 let stats = r.metadata.statistics().expect("statistics");
3299 if let Statistics::ByteArray(_stats) = stats {
3300 let min_value = _stats.min_opt().unwrap();
3301 let max_value = _stats.max_opt().unwrap();
3302
3303 assert!(!_stats.min_is_exact());
3304 assert!(!_stats.max_is_exact());
3305
3306 let expected_len = 64;
3307 assert_eq!(min_value.len(), expected_len);
3308 assert_eq!(max_value.len(), expected_len);
3309
3310 let expected_min =
3311 "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3312 assert_eq!(expected_min, min_value.as_bytes());
3313 let expected_max =
3315 "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3316 assert_eq!(expected_max, max_value.as_bytes());
3317 } else {
3318 panic!("expecting Statistics::ByteArray");
3319 }
3320 }
3321
3322 #[test]
3323 fn test_statistics_truncating_byte_array() {
3324 let page_writer = get_test_page_writer();
3325
3326 const TEST_TRUNCATE_LENGTH: usize = 1;
3327
3328 let builder =
3330 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3331 let props = Arc::new(builder.build());
3332 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3333
3334 let mut data = vec![ByteArray::default(); 1];
3335 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3337
3338 writer.write_batch(&data, None, None).unwrap();
3339
3340 writer.flush_data_pages().unwrap();
3341
3342 let r = writer.close().unwrap();
3343
3344 assert_eq!(1, r.rows_written);
3345
3346 let stats = r.metadata.statistics().expect("statistics");
3347 assert_eq!(stats.null_count_opt(), Some(0));
3348 assert_eq!(stats.distinct_count_opt(), None);
3349 if let Statistics::ByteArray(_stats) = stats {
3350 let min_value = _stats.min_opt().unwrap();
3351 let max_value = _stats.max_opt().unwrap();
3352
3353 assert!(!_stats.min_is_exact());
3354 assert!(!_stats.max_is_exact());
3355
3356 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3357 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3358
3359 assert_eq!("B".as_bytes(), min_value.as_bytes());
3360 assert_eq!("C".as_bytes(), max_value.as_bytes());
3361 } else {
3362 panic!("expecting Statistics::ByteArray");
3363 }
3364 }
3365
3366 #[test]
3367 fn test_statistics_truncating_fixed_len_byte_array() {
3368 let page_writer = get_test_page_writer();
3369
3370 const TEST_TRUNCATE_LENGTH: usize = 1;
3371
3372 let builder =
3374 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3375 let props = Arc::new(builder.build());
3376 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3377
3378 let mut data = vec![FixedLenByteArray::default(); 1];
3379
3380 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3381 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3382
3383 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3385 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3386
3387 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3389
3390 writer.write_batch(&data, None, None).unwrap();
3391
3392 writer.flush_data_pages().unwrap();
3393
3394 let r = writer.close().unwrap();
3395
3396 assert_eq!(1, r.rows_written);
3397
3398 let stats = r.metadata.statistics().expect("statistics");
3399 assert_eq!(stats.null_count_opt(), Some(0));
3400 assert_eq!(stats.distinct_count_opt(), None);
3401 if let Statistics::FixedLenByteArray(_stats) = stats {
3402 let min_value = _stats.min_opt().unwrap();
3403 let max_value = _stats.max_opt().unwrap();
3404
3405 assert!(!_stats.min_is_exact());
3406 assert!(!_stats.max_is_exact());
3407
3408 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3409 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3410
3411 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3412 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3413
3414 let reconstructed_min = i128::from_be_bytes([
3415 min_value.as_bytes()[0],
3416 0,
3417 0,
3418 0,
3419 0,
3420 0,
3421 0,
3422 0,
3423 0,
3424 0,
3425 0,
3426 0,
3427 0,
3428 0,
3429 0,
3430 0,
3431 ]);
3432
3433 let reconstructed_max = i128::from_be_bytes([
3434 max_value.as_bytes()[0],
3435 0,
3436 0,
3437 0,
3438 0,
3439 0,
3440 0,
3441 0,
3442 0,
3443 0,
3444 0,
3445 0,
3446 0,
3447 0,
3448 0,
3449 0,
3450 ]);
3451
3452 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3454 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3455 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3456 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3457 } else {
3458 panic!("expecting Statistics::FixedLenByteArray");
3459 }
3460 }
3461
3462 #[test]
3463 fn test_send() {
3464 fn test<T: Send>() {}
3465 test::<ColumnWriterImpl<Int32Type>>();
3466 }
3467
3468 #[test]
3469 fn test_increment() {
3470 let v = increment(vec![0, 0, 0]).unwrap();
3471 assert_eq!(&v, &[0, 0, 1]);
3472
3473 let v = increment(vec![0, 255, 255]).unwrap();
3475 assert_eq!(&v, &[1, 0, 0]);
3476
3477 let v = increment(vec![255, 255, 255]);
3479 assert!(v.is_none());
3480 }
3481
3482 #[test]
3483 fn test_increment_utf8() {
3484 let test_inc = |o: &str, expected: &str| {
3485 if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3486 assert_eq!(v, expected);
3488 assert!(*v > *o);
3490 let mut greater = ByteArray::new();
3492 greater.set_data(Bytes::from(v));
3493 let mut original = ByteArray::new();
3494 original.set_data(Bytes::from(o.as_bytes().to_vec()));
3495 assert!(greater > original);
3496 } else {
3497 panic!("Expected incremented UTF8 string to also be valid.");
3498 }
3499 };
3500
3501 test_inc("hello", "hellp");
3503
3504 test_inc("a\u{7f}", "b");
3506
3507 assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3509
3510 test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3512
3513 test_inc("éééé", "éééê");
3515
3516 test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3518
3519 test_inc("a\u{7ff}", "b");
3521
3522 assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3524
3525 test_inc("ࠀࠀ", "ࠀࠁ");
3528
3529 test_inc("a\u{ffff}", "b");
3531
3532 assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3534
3535 test_inc("𐀀𐀀", "𐀀𐀁");
3537
3538 test_inc("a\u{10ffff}", "b");
3540
3541 assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3543
3544 test_inc("a\u{D7FF}", "b");
3547 }
3548
3549 #[test]
3550 fn test_truncate_utf8() {
3551 let data = "❤️🧡💛💚💙💜";
3553 let r = truncate_utf8(data, data.len()).unwrap();
3554 assert_eq!(r.len(), data.len());
3555 assert_eq!(&r, data.as_bytes());
3556
3557 let r = truncate_utf8(data, 13).unwrap();
3559 assert_eq!(r.len(), 10);
3560 assert_eq!(&r, "❤️🧡".as_bytes());
3561
3562 let r = truncate_utf8("\u{0836}", 1);
3564 assert!(r.is_none());
3565
3566 let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3569 assert_eq!(&r, "yyyyyyyz".as_bytes());
3570
3571 let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3573 assert_eq!(&r, "ééê".as_bytes());
3574
3575 let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3577 assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3578
3579 let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3581 assert!(r.is_none());
3582
3583 let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3586 assert_eq!(&r, "ࠀࠁ".as_bytes());
3587
3588 let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3590 assert!(r.is_none());
3591
3592 let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3594 assert_eq!(&r, "𐀀𐀁".as_bytes());
3595
3596 let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3598 assert!(r.is_none());
3599 }
3600
3601 #[test]
3602 fn test_byte_array_truncate_invalid_utf8_statistics() {
3605 let message_type = "
3606 message test_schema {
3607 OPTIONAL BYTE_ARRAY a (UTF8);
3608 }
3609 ";
3610 let schema = Arc::new(parse_message_type(message_type).unwrap());
3611
3612 let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3614 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3615 let file: File = tempfile::tempfile().unwrap();
3616 let props = Arc::new(
3617 WriterProperties::builder()
3618 .set_statistics_enabled(EnabledStatistics::Chunk)
3619 .set_statistics_truncate_length(Some(8))
3620 .build(),
3621 );
3622
3623 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3624 let mut row_group_writer = writer.next_row_group().unwrap();
3625
3626 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3627 col_writer
3628 .typed::<ByteArrayType>()
3629 .write_batch(&data, Some(&def_levels), None)
3630 .unwrap();
3631 col_writer.close().unwrap();
3632 row_group_writer.close().unwrap();
3633 let file_metadata = writer.close().unwrap();
3634 let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
3635 assert!(!stats.max_is_exact());
3636 assert_eq!(
3639 stats.max_bytes_opt().map(|v| v.to_vec()),
3640 Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3641 );
3642 }
3643
3644 #[test]
3645 fn test_increment_max_binary_chars() {
3646 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3647 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3648
3649 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3650 assert!(incremented.is_none())
3651 }
3652
3653 #[test]
3654 fn test_no_column_index_when_stats_disabled() {
3655 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3659 let props = Arc::new(
3660 WriterProperties::builder()
3661 .set_statistics_enabled(EnabledStatistics::None)
3662 .build(),
3663 );
3664 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3665 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3666
3667 let data = Vec::new();
3668 let def_levels = vec![0; 10];
3669 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3670 writer.flush_data_pages().unwrap();
3671
3672 let column_close_result = writer.close().unwrap();
3673 assert!(column_close_result.offset_index.is_some());
3674 assert!(column_close_result.column_index.is_none());
3675 }
3676
3677 #[test]
3678 fn test_no_offset_index_when_disabled() {
3679 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3681 let props = Arc::new(
3682 WriterProperties::builder()
3683 .set_statistics_enabled(EnabledStatistics::None)
3684 .set_offset_index_disabled(true)
3685 .build(),
3686 );
3687 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3688 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3689
3690 let data = Vec::new();
3691 let def_levels = vec![0; 10];
3692 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3693 writer.flush_data_pages().unwrap();
3694
3695 let column_close_result = writer.close().unwrap();
3696 assert!(column_close_result.offset_index.is_none());
3697 assert!(column_close_result.column_index.is_none());
3698 }
3699
3700 #[test]
3701 fn test_offset_index_overridden() {
3702 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3704 let props = Arc::new(
3705 WriterProperties::builder()
3706 .set_statistics_enabled(EnabledStatistics::Page)
3707 .set_offset_index_disabled(true)
3708 .build(),
3709 );
3710 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3711 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3712
3713 let data = Vec::new();
3714 let def_levels = vec![0; 10];
3715 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3716 writer.flush_data_pages().unwrap();
3717
3718 let column_close_result = writer.close().unwrap();
3719 assert!(column_close_result.offset_index.is_some());
3720 assert!(column_close_result.column_index.is_some());
3721 }
3722
3723 #[test]
3724 fn test_boundary_order() -> Result<()> {
3725 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3726 let column_close_result = write_multiple_pages::<Int32Type>(
3728 &descr,
3729 &[
3730 &[Some(-10), Some(10)],
3731 &[Some(-5), Some(11)],
3732 &[None],
3733 &[Some(-5), Some(11)],
3734 ],
3735 )?;
3736 let boundary_order = column_close_result
3737 .column_index
3738 .unwrap()
3739 .get_boundary_order();
3740 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3741
3742 let column_close_result = write_multiple_pages::<Int32Type>(
3744 &descr,
3745 &[
3746 &[Some(10), Some(11)],
3747 &[Some(5), Some(11)],
3748 &[None],
3749 &[Some(-5), Some(0)],
3750 ],
3751 )?;
3752 let boundary_order = column_close_result
3753 .column_index
3754 .unwrap()
3755 .get_boundary_order();
3756 assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3757
3758 let column_close_result = write_multiple_pages::<Int32Type>(
3760 &descr,
3761 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3762 )?;
3763 let boundary_order = column_close_result
3764 .column_index
3765 .unwrap()
3766 .get_boundary_order();
3767 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3768
3769 let column_close_result =
3771 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3772 let boundary_order = column_close_result
3773 .column_index
3774 .unwrap()
3775 .get_boundary_order();
3776 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3777
3778 let column_close_result =
3780 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3781 let boundary_order = column_close_result
3782 .column_index
3783 .unwrap()
3784 .get_boundary_order();
3785 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3786
3787 let column_close_result =
3789 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3790 let boundary_order = column_close_result
3791 .column_index
3792 .unwrap()
3793 .get_boundary_order();
3794 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3795
3796 let column_close_result = write_multiple_pages::<Int32Type>(
3798 &descr,
3799 &[
3800 &[Some(10), Some(11)],
3801 &[Some(11), Some(16)],
3802 &[None],
3803 &[Some(-5), Some(0)],
3804 ],
3805 )?;
3806 let boundary_order = column_close_result
3807 .column_index
3808 .unwrap()
3809 .get_boundary_order();
3810 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3811
3812 let column_close_result = write_multiple_pages::<Int32Type>(
3814 &descr,
3815 &[
3816 &[Some(1), Some(9)],
3817 &[Some(2), Some(8)],
3818 &[None],
3819 &[Some(3), Some(7)],
3820 ],
3821 )?;
3822 let boundary_order = column_close_result
3823 .column_index
3824 .unwrap()
3825 .get_boundary_order();
3826 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3827
3828 Ok(())
3829 }
3830
3831 #[test]
3832 fn test_boundary_order_logical_type() -> Result<()> {
3833 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3836 let fba_descr = {
3837 let tpe = SchemaType::primitive_type_builder(
3838 "col",
3839 FixedLenByteArrayType::get_physical_type(),
3840 )
3841 .with_length(2)
3842 .build()?;
3843 Arc::new(ColumnDescriptor::new(
3844 Arc::new(tpe),
3845 1,
3846 0,
3847 ColumnPath::from("col"),
3848 ))
3849 };
3850
3851 let values: &[&[Option<FixedLenByteArray>]] = &[
3852 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3853 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3854 &[Some(FixedLenByteArray::from(ByteArray::from(
3855 f16::NEG_ZERO,
3856 )))],
3857 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3858 ];
3859
3860 let column_close_result =
3862 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3863 let boundary_order = column_close_result
3864 .column_index
3865 .unwrap()
3866 .get_boundary_order();
3867 assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3868
3869 let column_close_result =
3871 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3872 let boundary_order = column_close_result
3873 .column_index
3874 .unwrap()
3875 .get_boundary_order();
3876 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3877
3878 Ok(())
3879 }
3880
3881 #[test]
3882 fn test_interval_stats_should_not_have_min_max() {
3883 let input = [
3884 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3885 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3886 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3887 ]
3888 .into_iter()
3889 .map(|s| ByteArray::from(s).into())
3890 .collect::<Vec<_>>();
3891
3892 let page_writer = get_test_page_writer();
3893 let mut writer = get_test_interval_column_writer(page_writer);
3894 writer.write_batch(&input, None, None).unwrap();
3895
3896 let metadata = writer.close().unwrap().metadata;
3897 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3898 stats.clone()
3899 } else {
3900 panic!("metadata missing statistics");
3901 };
3902 assert!(stats.min_bytes_opt().is_none());
3903 assert!(stats.max_bytes_opt().is_none());
3904 }
3905
3906 #[test]
3907 #[cfg(feature = "arrow")]
3908 fn test_column_writer_get_estimated_total_bytes() {
3909 let page_writer = get_test_page_writer();
3910 let props = Default::default();
3911 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3912 assert_eq!(writer.get_estimated_total_bytes(), 0);
3913
3914 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3915 writer.add_data_page().unwrap();
3916 let size_with_one_page = writer.get_estimated_total_bytes();
3917 assert_eq!(size_with_one_page, 20);
3918
3919 writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3920 writer.add_data_page().unwrap();
3921 let size_with_two_pages = writer.get_estimated_total_bytes();
3922 assert_eq!(size_with_two_pages, 20 + 21);
3924 }
3925
3926 fn write_multiple_pages<T: DataType>(
3927 column_descr: &Arc<ColumnDescriptor>,
3928 pages: &[&[Option<T::T>]],
3929 ) -> Result<ColumnCloseResult> {
3930 let column_writer = get_column_writer(
3931 column_descr.clone(),
3932 Default::default(),
3933 get_test_page_writer(),
3934 );
3935 let mut writer = get_typed_column_writer::<T>(column_writer);
3936
3937 for &page in pages {
3938 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3939 let def_levels = page
3940 .iter()
3941 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3942 .collect::<Vec<_>>();
3943 writer.write_batch(&values, Some(&def_levels), None)?;
3944 writer.flush_data_pages()?;
3945 }
3946
3947 writer.close()
3948 }
3949
3950 fn column_roundtrip_random<T: DataType>(
3954 props: WriterProperties,
3955 max_size: usize,
3956 min_value: T::T,
3957 max_value: T::T,
3958 max_def_level: i16,
3959 max_rep_level: i16,
3960 ) where
3961 T::T: PartialOrd + SampleUniform + Copy,
3962 {
3963 let mut num_values: usize = 0;
3964
3965 let mut buf: Vec<i16> = Vec::new();
3966 let def_levels = if max_def_level > 0 {
3967 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3968 for &dl in &buf[..] {
3969 if dl == max_def_level {
3970 num_values += 1;
3971 }
3972 }
3973 Some(&buf[..])
3974 } else {
3975 num_values = max_size;
3976 None
3977 };
3978
3979 let mut buf: Vec<i16> = Vec::new();
3980 let rep_levels = if max_rep_level > 0 {
3981 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3982 buf[0] = 0; Some(&buf[..])
3984 } else {
3985 None
3986 };
3987
3988 let mut values: Vec<T::T> = Vec::new();
3989 random_numbers_range(num_values, min_value, max_value, &mut values);
3990
3991 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3992 }
3993
3994 fn column_roundtrip<T: DataType>(
3996 props: WriterProperties,
3997 values: &[T::T],
3998 def_levels: Option<&[i16]>,
3999 rep_levels: Option<&[i16]>,
4000 ) {
4001 let mut file = tempfile::tempfile().unwrap();
4002 let mut write = TrackedWrite::new(&mut file);
4003 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4004
4005 let max_def_level = match def_levels {
4006 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4007 None => 0i16,
4008 };
4009
4010 let max_rep_level = match rep_levels {
4011 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
4012 None => 0i16,
4013 };
4014
4015 let mut max_batch_size = values.len();
4016 if let Some(levels) = def_levels {
4017 max_batch_size = max_batch_size.max(levels.len());
4018 }
4019 if let Some(levels) = rep_levels {
4020 max_batch_size = max_batch_size.max(levels.len());
4021 }
4022
4023 let mut writer =
4024 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4025
4026 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
4027 assert_eq!(values_written, values.len());
4028 let result = writer.close().unwrap();
4029
4030 drop(write);
4031
4032 let props = ReaderProperties::builder()
4033 .set_backward_compatible_lz4(false)
4034 .build();
4035 let page_reader = Box::new(
4036 SerializedPageReader::new_with_properties(
4037 Arc::new(file),
4038 &result.metadata,
4039 result.rows_written as usize,
4040 None,
4041 Arc::new(props),
4042 )
4043 .unwrap(),
4044 );
4045 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
4046
4047 let mut actual_values = Vec::with_capacity(max_batch_size);
4048 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
4049 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
4050
4051 let (_, values_read, levels_read) = reader
4052 .read_records(
4053 max_batch_size,
4054 actual_def_levels.as_mut(),
4055 actual_rep_levels.as_mut(),
4056 &mut actual_values,
4057 )
4058 .unwrap();
4059
4060 assert_eq!(&actual_values[..values_read], values);
4063 match actual_def_levels {
4064 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
4065 None => assert_eq!(None, def_levels),
4066 }
4067 match actual_rep_levels {
4068 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
4069 None => assert_eq!(None, rep_levels),
4070 }
4071
4072 if let Some(levels) = actual_rep_levels {
4075 let mut actual_rows_written = 0;
4076 for l in levels {
4077 if l == 0 {
4078 actual_rows_written += 1;
4079 }
4080 }
4081 assert_eq!(actual_rows_written, result.rows_written);
4082 } else if actual_def_levels.is_some() {
4083 assert_eq!(levels_read as u64, result.rows_written);
4084 } else {
4085 assert_eq!(values_read as u64, result.rows_written);
4086 }
4087 }
4088
4089 fn column_write_and_get_metadata<T: DataType>(
4092 props: WriterProperties,
4093 values: &[T::T],
4094 ) -> ColumnChunkMetaData {
4095 let page_writer = get_test_page_writer();
4096 let props = Arc::new(props);
4097 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4098 writer.write_batch(values, None, None).unwrap();
4099 writer.close().unwrap().metadata
4100 }
4101
4102 fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4104 PageEncodingStats {
4105 page_type,
4106 encoding,
4107 count,
4108 }
4109 }
4110
4111 fn check_encoding_write_support<T: DataType>(
4115 version: WriterVersion,
4116 dict_enabled: bool,
4117 data: &[T::T],
4118 dictionary_page_offset: Option<i64>,
4119 encodings: &[Encoding],
4120 page_encoding_stats: &[PageEncodingStats],
4121 ) {
4122 let props = WriterProperties::builder()
4123 .set_writer_version(version)
4124 .set_dictionary_enabled(dict_enabled)
4125 .build();
4126 let meta = column_write_and_get_metadata::<T>(props, data);
4127 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4128 assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
4129 assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4130 }
4131
4132 fn get_test_column_writer<'a, T: DataType>(
4134 page_writer: Box<dyn PageWriter + 'a>,
4135 max_def_level: i16,
4136 max_rep_level: i16,
4137 props: WriterPropertiesPtr,
4138 ) -> ColumnWriterImpl<'a, T> {
4139 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4140 let column_writer = get_column_writer(descr, props, page_writer);
4141 get_typed_column_writer::<T>(column_writer)
4142 }
4143
4144 fn get_test_column_writer_with_path<'a, T: DataType>(
4145 page_writer: Box<dyn PageWriter + 'a>,
4146 max_def_level: i16,
4147 max_rep_level: i16,
4148 props: WriterPropertiesPtr,
4149 path: ColumnPath,
4150 ) -> ColumnWriterImpl<'a, T> {
4151 let descr = Arc::new(get_test_column_descr_with_path::<T>(
4152 max_def_level,
4153 max_rep_level,
4154 path,
4155 ));
4156 let column_writer = get_column_writer(descr, props, page_writer);
4157 get_typed_column_writer::<T>(column_writer)
4158 }
4159
4160 fn get_test_column_reader<T: DataType>(
4162 page_reader: Box<dyn PageReader>,
4163 max_def_level: i16,
4164 max_rep_level: i16,
4165 ) -> ColumnReaderImpl<T> {
4166 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4167 let column_reader = get_column_reader(descr, page_reader);
4168 get_typed_column_reader::<T>(column_reader)
4169 }
4170
4171 fn get_test_column_descr<T: DataType>(
4173 max_def_level: i16,
4174 max_rep_level: i16,
4175 ) -> ColumnDescriptor {
4176 let path = ColumnPath::from("col");
4177 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4178 .with_length(1)
4181 .build()
4182 .unwrap();
4183 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4184 }
4185
4186 fn get_test_column_descr_with_path<T: DataType>(
4187 max_def_level: i16,
4188 max_rep_level: i16,
4189 path: ColumnPath,
4190 ) -> ColumnDescriptor {
4191 let name = path.string();
4192 let tpe = SchemaType::primitive_type_builder(&name, T::get_physical_type())
4193 .with_length(1)
4196 .build()
4197 .unwrap();
4198 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4199 }
4200
4201 fn write_and_collect_page_values(
4202 path: ColumnPath,
4203 props: WriterPropertiesPtr,
4204 data: &[i32],
4205 ) -> Vec<u32> {
4206 let mut file = tempfile::tempfile().unwrap();
4207 let mut write = TrackedWrite::new(&mut file);
4208 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4209 let mut writer =
4210 get_test_column_writer_with_path::<Int32Type>(page_writer, 0, 0, props, path);
4211 writer.write_batch(data, None, None).unwrap();
4212 let r = writer.close().unwrap();
4213
4214 drop(write);
4215
4216 let props = ReaderProperties::builder()
4217 .set_backward_compatible_lz4(false)
4218 .build();
4219 let mut page_reader = Box::new(
4220 SerializedPageReader::new_with_properties(
4221 Arc::new(file),
4222 &r.metadata,
4223 r.rows_written as usize,
4224 None,
4225 Arc::new(props),
4226 )
4227 .unwrap(),
4228 );
4229
4230 let mut values_per_page = Vec::new();
4231 while let Some(page) = page_reader.get_next_page().unwrap() {
4232 assert_eq!(page.page_type(), PageType::DATA_PAGE);
4233 values_per_page.push(page.num_values());
4234 }
4235
4236 values_per_page
4237 }
4238
4239 fn get_test_page_writer() -> Box<dyn PageWriter> {
4241 Box::new(TestPageWriter {})
4242 }
4243
4244 struct TestPageWriter {}
4245
4246 impl PageWriter for TestPageWriter {
4247 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4248 let mut res = PageWriteSpec::new();
4249 res.page_type = page.page_type();
4250 res.uncompressed_size = page.uncompressed_size();
4251 res.compressed_size = page.compressed_size();
4252 res.num_values = page.num_values();
4253 res.offset = 0;
4254 res.bytes_written = page.data().len() as u64;
4255 Ok(res)
4256 }
4257
4258 fn close(&mut self) -> Result<()> {
4259 Ok(())
4260 }
4261 }
4262
4263 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4265 let page_writer = get_test_page_writer();
4266 let props = Default::default();
4267 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4268 writer.write_batch(values, None, None).unwrap();
4269
4270 let metadata = writer.close().unwrap().metadata;
4271 if let Some(stats) = metadata.statistics() {
4272 stats.clone()
4273 } else {
4274 panic!("metadata missing statistics");
4275 }
4276 }
4277
4278 fn get_test_decimals_column_writer<T: DataType>(
4280 page_writer: Box<dyn PageWriter>,
4281 max_def_level: i16,
4282 max_rep_level: i16,
4283 props: WriterPropertiesPtr,
4284 ) -> ColumnWriterImpl<'static, T> {
4285 let descr = Arc::new(get_test_decimals_column_descr::<T>(
4286 max_def_level,
4287 max_rep_level,
4288 ));
4289 let column_writer = get_column_writer(descr, props, page_writer);
4290 get_typed_column_writer::<T>(column_writer)
4291 }
4292
4293 fn get_test_decimals_column_descr<T: DataType>(
4295 max_def_level: i16,
4296 max_rep_level: i16,
4297 ) -> ColumnDescriptor {
4298 let path = ColumnPath::from("col");
4299 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4300 .with_length(16)
4301 .with_logical_type(Some(LogicalType::Decimal {
4302 scale: 2,
4303 precision: 3,
4304 }))
4305 .with_scale(2)
4306 .with_precision(3)
4307 .build()
4308 .unwrap();
4309 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4310 }
4311
4312 fn float16_statistics_roundtrip(
4313 values: &[FixedLenByteArray],
4314 ) -> ValueStatistics<FixedLenByteArray> {
4315 let page_writer = get_test_page_writer();
4316 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4317 writer.write_batch(values, None, None).unwrap();
4318
4319 let metadata = writer.close().unwrap().metadata;
4320 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4321 stats.clone()
4322 } else {
4323 panic!("metadata missing statistics");
4324 }
4325 }
4326
4327 fn get_test_float16_column_writer(
4328 page_writer: Box<dyn PageWriter>,
4329 props: WriterPropertiesPtr,
4330 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4331 let descr = Arc::new(get_test_float16_column_descr(0, 0));
4332 let column_writer = get_column_writer(descr, props, page_writer);
4333 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4334 }
4335
4336 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4337 let path = ColumnPath::from("col");
4338 let tpe =
4339 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4340 .with_length(2)
4341 .with_logical_type(Some(LogicalType::Float16))
4342 .build()
4343 .unwrap();
4344 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4345 }
4346
4347 fn get_test_interval_column_writer(
4348 page_writer: Box<dyn PageWriter>,
4349 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4350 let descr = Arc::new(get_test_interval_column_descr());
4351 let column_writer = get_column_writer(descr, Default::default(), page_writer);
4352 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4353 }
4354
4355 fn get_test_interval_column_descr() -> ColumnDescriptor {
4356 let path = ColumnPath::from("col");
4357 let tpe =
4358 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4359 .with_length(12)
4360 .with_converted_type(ConvertedType::INTERVAL)
4361 .build()
4362 .unwrap();
4363 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4364 }
4365
4366 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4368 page_writer: Box<dyn PageWriter + 'a>,
4369 max_def_level: i16,
4370 max_rep_level: i16,
4371 props: WriterPropertiesPtr,
4372 ) -> ColumnWriterImpl<'a, T> {
4373 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4374 max_def_level,
4375 max_rep_level,
4376 ));
4377 let column_writer = get_column_writer(descr, props, page_writer);
4378 get_typed_column_writer::<T>(column_writer)
4379 }
4380
4381 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4383 max_def_level: i16,
4384 max_rep_level: i16,
4385 ) -> ColumnDescriptor {
4386 let path = ColumnPath::from("col");
4387 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4388 .with_converted_type(ConvertedType::UINT_32)
4389 .build()
4390 .unwrap();
4391 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4392 }
4393
4394 #[test]
4395 fn test_page_v2_snappy_compression_fallback() {
4396 let page_writer = TestPageWriter {};
4398
4399 let props = WriterProperties::builder()
4401 .set_writer_version(WriterVersion::PARQUET_2_0)
4402 .set_dictionary_enabled(false)
4404 .set_compression(Compression::SNAPPY)
4405 .build();
4406
4407 let mut column_writer =
4408 get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
4409
4410 let values = vec![ByteArray::from("a")];
4413
4414 column_writer.write_batch(&values, None, None).unwrap();
4415
4416 let result = column_writer.close().unwrap();
4417 assert_eq!(
4418 result.metadata.uncompressed_size(),
4419 result.metadata.compressed_size()
4420 );
4421 }
4422}