1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::file::page_index::column_index::ColumnIndexMetaData;
25use crate::file::page_index::offset_index::OffsetIndexMetaData;
26use std::collections::{BTreeSet, VecDeque};
27use std::str;
28
29use crate::basic::{
30 BoundaryOrder, Compression, ConvertedType, Encoding, EncodingMask, LogicalType, PageType, Type,
31};
32use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
33use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
34use crate::compression::{Codec, CodecOptionsBuilder, create_codec};
35use crate::data_type::private::ParquetValueType;
36use crate::data_type::*;
37use crate::encodings::levels::LevelEncoder;
38#[cfg(feature = "encryption")]
39use crate::encryption::encrypt::get_column_crypto_metadata;
40use crate::errors::{ParquetError, Result};
41use crate::file::metadata::{
42 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
43 OffsetIndexBuilder, PageEncodingStats,
44};
45use crate::file::properties::{
46 EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
47};
48use crate::file::statistics::{Statistics, ValueStatistics};
49use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
50
51pub(crate) mod encoder;
52
53macro_rules! downcast_writer {
54 ($e:expr, $i:ident, $b:expr) => {
55 match $e {
56 Self::BoolColumnWriter($i) => $b,
57 Self::Int32ColumnWriter($i) => $b,
58 Self::Int64ColumnWriter($i) => $b,
59 Self::Int96ColumnWriter($i) => $b,
60 Self::FloatColumnWriter($i) => $b,
61 Self::DoubleColumnWriter($i) => $b,
62 Self::ByteArrayColumnWriter($i) => $b,
63 Self::FixedLenByteArrayColumnWriter($i) => $b,
64 }
65 };
66}
67
68pub enum ColumnWriter<'a> {
72 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
74 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
76 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
78 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
80 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
82 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
84 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
86 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
88}
89
90impl ColumnWriter<'_> {
91 #[cfg(feature = "arrow")]
93 pub(crate) fn memory_size(&self) -> usize {
94 downcast_writer!(self, typed, typed.memory_size())
95 }
96
97 #[cfg(feature = "arrow")]
99 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
100 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
101 }
102
103 #[cfg(feature = "arrow")]
108 pub(crate) fn add_data_page(&mut self) -> Result<()> {
109 downcast_writer!(self, typed, typed.add_data_page())
110 }
111
112 pub fn close(self) -> Result<ColumnCloseResult> {
114 downcast_writer!(self, typed, typed.close())
115 }
116}
117
118pub fn get_column_writer<'a>(
120 descr: ColumnDescPtr,
121 props: WriterPropertiesPtr,
122 page_writer: Box<dyn PageWriter + 'a>,
123) -> ColumnWriter<'a> {
124 match descr.physical_type() {
125 Type::BOOLEAN => {
126 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127 }
128 Type::INT32 => {
129 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130 }
131 Type::INT64 => {
132 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133 }
134 Type::INT96 => {
135 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136 }
137 Type::FLOAT => {
138 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
139 }
140 Type::DOUBLE => {
141 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
142 }
143 Type::BYTE_ARRAY => {
144 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
145 }
146 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
147 ColumnWriterImpl::new(descr, props, page_writer),
148 ),
149 }
150}
151
152pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
157 T::get_column_writer(col_writer).unwrap_or_else(|| {
158 panic!(
159 "Failed to convert column writer into a typed column writer for `{}` type",
160 T::get_physical_type()
161 )
162 })
163}
164
165pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
167 col_writer: &'b ColumnWriter<'a>,
168) -> &'b ColumnWriterImpl<'a, T> {
169 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
170 panic!(
171 "Failed to convert column writer into a typed column writer for `{}` type",
172 T::get_physical_type()
173 )
174 })
175}
176
177pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
179 col_writer: &'a mut ColumnWriter<'b>,
180) -> &'a mut ColumnWriterImpl<'b, T> {
181 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
182 panic!(
183 "Failed to convert column writer into a typed column writer for `{}` type",
184 T::get_physical_type()
185 )
186 })
187}
188
189#[derive(Debug, Clone)]
193pub struct ColumnCloseResult {
194 pub bytes_written: u64,
196 pub rows_written: u64,
198 pub metadata: ColumnChunkMetaData,
200 pub bloom_filter: Option<Sbbf>,
202 pub column_index: Option<ColumnIndexMetaData>,
204 pub offset_index: Option<OffsetIndexMetaData>,
206}
207
208#[derive(Default)]
210struct PageMetrics {
211 num_buffered_values: u32,
212 num_buffered_rows: u32,
213 num_page_nulls: u64,
214 repetition_level_histogram: Option<LevelHistogram>,
215 definition_level_histogram: Option<LevelHistogram>,
216}
217
218impl PageMetrics {
219 fn new() -> Self {
220 Default::default()
221 }
222
223 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
225 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
226 self
227 }
228
229 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
231 self.definition_level_histogram = LevelHistogram::try_new(max_level);
232 self
233 }
234
235 fn new_page(&mut self) {
238 self.num_buffered_values = 0;
239 self.num_buffered_rows = 0;
240 self.num_page_nulls = 0;
241 self.repetition_level_histogram
242 .as_mut()
243 .map(LevelHistogram::reset);
244 self.definition_level_histogram
245 .as_mut()
246 .map(LevelHistogram::reset);
247 }
248
249 fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
251 if let Some(ref mut rep_hist) = self.repetition_level_histogram {
252 rep_hist.update_from_levels(levels);
253 }
254 }
255
256 fn update_definition_level_histogram(&mut self, levels: &[i16]) {
258 if let Some(ref mut def_hist) = self.definition_level_histogram {
259 def_hist.update_from_levels(levels);
260 }
261 }
262}
263
264#[derive(Default)]
266struct ColumnMetrics<T: Default> {
267 total_bytes_written: u64,
268 total_rows_written: u64,
269 total_uncompressed_size: u64,
270 total_compressed_size: u64,
271 total_num_values: u64,
272 dictionary_page_offset: Option<u64>,
273 data_page_offset: Option<u64>,
274 min_column_value: Option<T>,
275 max_column_value: Option<T>,
276 num_column_nulls: u64,
277 column_distinct_count: Option<u64>,
278 variable_length_bytes: Option<i64>,
279 repetition_level_histogram: Option<LevelHistogram>,
280 definition_level_histogram: Option<LevelHistogram>,
281}
282
283impl<T: Default> ColumnMetrics<T> {
284 fn new() -> Self {
285 Default::default()
286 }
287
288 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
290 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
291 self
292 }
293
294 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
296 self.definition_level_histogram = LevelHistogram::try_new(max_level);
297 self
298 }
299
300 fn update_histogram(
302 chunk_histogram: &mut Option<LevelHistogram>,
303 page_histogram: &Option<LevelHistogram>,
304 ) {
305 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
306 chunk_hist.add(page_hist);
307 }
308 }
309
310 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
313 ColumnMetrics::<T>::update_histogram(
314 &mut self.definition_level_histogram,
315 &page_metrics.definition_level_histogram,
316 );
317 ColumnMetrics::<T>::update_histogram(
318 &mut self.repetition_level_histogram,
319 &page_metrics.repetition_level_histogram,
320 );
321 }
322
323 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
325 if let Some(var_bytes) = variable_length_bytes {
326 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
327 }
328 }
329}
330
331pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
333
334pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
336 descr: ColumnDescPtr,
338 props: WriterPropertiesPtr,
339 statistics_enabled: EnabledStatistics,
340
341 page_writer: Box<dyn PageWriter + 'a>,
342 codec: Compression,
343 compressor: Option<Box<dyn Codec>>,
344 encoder: E,
345
346 page_metrics: PageMetrics,
347 column_metrics: ColumnMetrics<E::T>,
349
350 encodings: BTreeSet<Encoding>,
353 encoding_stats: Vec<PageEncodingStats>,
354 def_levels_encoder: LevelEncoder,
356 rep_levels_encoder: LevelEncoder,
357 data_pages: VecDeque<CompressedPage>,
358 column_index_builder: ColumnIndexBuilder,
360 offset_index_builder: Option<OffsetIndexBuilder>,
361
362 data_page_boundary_ascending: bool,
365 data_page_boundary_descending: bool,
366 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
368}
369
370impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
371 pub fn new(
373 descr: ColumnDescPtr,
374 props: WriterPropertiesPtr,
375 page_writer: Box<dyn PageWriter + 'a>,
376 ) -> Self {
377 let codec = props.compression(descr.path());
378 let codec_options = CodecOptionsBuilder::default().build();
379 let compressor = create_codec(codec, &codec_options).unwrap();
380 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
381
382 let statistics_enabled = props.statistics_enabled(descr.path());
383
384 let mut encodings = BTreeSet::new();
385 encodings.insert(Encoding::RLE);
387
388 let mut page_metrics = PageMetrics::new();
389 let mut column_metrics = ColumnMetrics::<E::T>::new();
390
391 if statistics_enabled != EnabledStatistics::None {
393 page_metrics = page_metrics
394 .with_repetition_level_histogram(descr.max_rep_level())
395 .with_definition_level_histogram(descr.max_def_level());
396 column_metrics = column_metrics
397 .with_repetition_level_histogram(descr.max_rep_level())
398 .with_definition_level_histogram(descr.max_def_level())
399 }
400
401 let mut column_index_builder = ColumnIndexBuilder::new(descr.physical_type());
403 if statistics_enabled != EnabledStatistics::Page {
404 column_index_builder.to_invalid()
405 }
406
407 let offset_index_builder = match props.offset_index_disabled() {
409 false => Some(OffsetIndexBuilder::new()),
410 _ => None,
411 };
412
413 Self {
414 def_levels_encoder: Self::create_level_encoder(descr.max_def_level(), &props),
415 rep_levels_encoder: Self::create_level_encoder(descr.max_rep_level(), &props),
416 descr,
417 props,
418 statistics_enabled,
419 page_writer,
420 codec,
421 compressor,
422 encoder,
423 data_pages: VecDeque::new(),
424 page_metrics,
425 column_metrics,
426 column_index_builder,
427 offset_index_builder,
428 encodings,
429 encoding_stats: vec![],
430 data_page_boundary_ascending: true,
431 data_page_boundary_descending: true,
432 last_non_null_data_page_min_max: None,
433 }
434 }
435
436 #[allow(clippy::too_many_arguments)]
437 pub(crate) fn write_batch_internal(
438 &mut self,
439 values: &E::Values,
440 value_indices: Option<&[usize]>,
441 def_levels: Option<&[i16]>,
442 rep_levels: Option<&[i16]>,
443 min: Option<&E::T>,
444 max: Option<&E::T>,
445 distinct_count: Option<u64>,
446 ) -> Result<usize> {
447 if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
449 if def.len() != rep.len() {
450 return Err(general_err!(
451 "Inconsistent length of definition and repetition levels: {} != {}",
452 def.len(),
453 rep.len()
454 ));
455 }
456 }
457
458 let num_levels = match def_levels {
469 Some(def_levels) => def_levels.len(),
470 None => values.len(),
471 };
472
473 if let Some(min) = min {
474 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
475 }
476 if let Some(max) = max {
477 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
478 }
479
480 if self.encoder.num_values() == 0 {
482 self.column_metrics.column_distinct_count = distinct_count;
483 } else {
484 self.column_metrics.column_distinct_count = None;
485 }
486
487 let mut values_offset = 0;
488 let mut levels_offset = 0;
489 let base_batch_size = self.props.write_batch_size();
490 while levels_offset < num_levels {
491 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
492
493 if let Some(r) = rep_levels {
495 while end_offset < r.len() && r[end_offset] != 0 {
496 end_offset += 1;
497 }
498 }
499
500 values_offset += self.write_mini_batch(
501 values,
502 values_offset,
503 value_indices,
504 end_offset - levels_offset,
505 def_levels.map(|lv| &lv[levels_offset..end_offset]),
506 rep_levels.map(|lv| &lv[levels_offset..end_offset]),
507 )?;
508 levels_offset = end_offset;
509 }
510
511 Ok(values_offset)
513 }
514
515 pub fn write_batch(
528 &mut self,
529 values: &E::Values,
530 def_levels: Option<&[i16]>,
531 rep_levels: Option<&[i16]>,
532 ) -> Result<usize> {
533 self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
534 }
535
536 pub fn write_batch_with_statistics(
544 &mut self,
545 values: &E::Values,
546 def_levels: Option<&[i16]>,
547 rep_levels: Option<&[i16]>,
548 min: Option<&E::T>,
549 max: Option<&E::T>,
550 distinct_count: Option<u64>,
551 ) -> Result<usize> {
552 self.write_batch_internal(
553 values,
554 None,
555 def_levels,
556 rep_levels,
557 min,
558 max,
559 distinct_count,
560 )
561 }
562
563 #[cfg(feature = "arrow")]
568 pub(crate) fn memory_size(&self) -> usize {
569 self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
570 }
571
572 pub fn get_total_bytes_written(&self) -> u64 {
578 self.column_metrics.total_bytes_written
579 }
580
581 #[cfg(feature = "arrow")]
587 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
588 self.data_pages
589 .iter()
590 .map(|page| page.data().len() as u64)
591 .sum::<u64>()
592 + self.column_metrics.total_bytes_written
593 + self.encoder.estimated_data_page_size() as u64
594 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
595 }
596
597 pub fn get_total_rows_written(&self) -> u64 {
600 self.column_metrics.total_rows_written
601 }
602
603 pub fn get_descriptor(&self) -> &ColumnDescPtr {
605 &self.descr
606 }
607
608 pub fn close(mut self) -> Result<ColumnCloseResult> {
611 if self.page_metrics.num_buffered_values > 0 {
612 self.add_data_page()?;
613 }
614 if self.encoder.has_dictionary() {
615 self.write_dictionary_page()?;
616 }
617 self.flush_data_pages()?;
618 let metadata = self.build_column_metadata()?;
619 self.page_writer.close()?;
620
621 let boundary_order = match (
622 self.data_page_boundary_ascending,
623 self.data_page_boundary_descending,
624 ) {
625 (true, _) => BoundaryOrder::ASCENDING,
628 (false, true) => BoundaryOrder::DESCENDING,
629 (false, false) => BoundaryOrder::UNORDERED,
630 };
631 self.column_index_builder.set_boundary_order(boundary_order);
632
633 let column_index = match self.column_index_builder.valid() {
634 true => Some(self.column_index_builder.build()?),
635 false => None,
636 };
637
638 let offset_index = self.offset_index_builder.map(|b| b.build());
639
640 Ok(ColumnCloseResult {
641 bytes_written: self.column_metrics.total_bytes_written,
642 rows_written: self.column_metrics.total_rows_written,
643 bloom_filter: self.encoder.flush_bloom_filter(),
644 metadata,
645 column_index,
646 offset_index,
647 })
648 }
649
650 fn create_level_encoder(max_level: i16, props: &WriterProperties) -> LevelEncoder {
652 match props.writer_version() {
653 WriterVersion::PARQUET_1_0 => LevelEncoder::v1_streaming(max_level),
654 WriterVersion::PARQUET_2_0 => LevelEncoder::v2_streaming(max_level),
655 }
656 }
657
658 fn write_mini_batch(
662 &mut self,
663 values: &E::Values,
664 values_offset: usize,
665 value_indices: Option<&[usize]>,
666 num_levels: usize,
667 def_levels: Option<&[i16]>,
668 rep_levels: Option<&[i16]>,
669 ) -> Result<usize> {
670 let values_to_write = if self.descr.max_def_level() > 0 {
672 let levels = def_levels.ok_or_else(|| {
673 general_err!(
674 "Definition levels are required, because max definition level = {}",
675 self.descr.max_def_level()
676 )
677 })?;
678
679 let values_to_write = levels
680 .iter()
681 .map(|level| (*level == self.descr.max_def_level()) as usize)
682 .sum();
683 self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
684
685 self.page_metrics.update_definition_level_histogram(levels);
687
688 self.def_levels_encoder.put(levels);
689 values_to_write
690 } else {
691 num_levels
692 };
693
694 if self.descr.max_rep_level() > 0 {
696 let levels = rep_levels.ok_or_else(|| {
698 general_err!(
699 "Repetition levels are required, because max repetition level = {}",
700 self.descr.max_rep_level()
701 )
702 })?;
703
704 if !levels.is_empty() && levels[0] != 0 {
705 return Err(general_err!(
706 "Write must start at a record boundary, got non-zero repetition level of {}",
707 levels[0]
708 ));
709 }
710
711 for &level in levels {
713 self.page_metrics.num_buffered_rows += (level == 0) as u32
714 }
715
716 self.page_metrics.update_repetition_level_histogram(levels);
718
719 self.rep_levels_encoder.put(levels);
720 } else {
721 self.page_metrics.num_buffered_rows += num_levels as u32;
724 }
725
726 match value_indices {
727 Some(indices) => {
728 let indices = &indices[values_offset..values_offset + values_to_write];
729 self.encoder.write_gather(values, indices)?;
730 }
731 None => self.encoder.write(values, values_offset, values_to_write)?,
732 }
733
734 self.page_metrics.num_buffered_values += num_levels as u32;
735
736 if self.should_add_data_page() {
737 self.add_data_page()?;
738 }
739
740 if self.should_dict_fallback() {
741 self.dict_fallback()?;
742 }
743
744 Ok(values_to_write)
745 }
746
747 #[inline]
752 fn should_dict_fallback(&self) -> bool {
753 match self.encoder.estimated_dict_page_size() {
754 Some(size) => {
755 size >= self
756 .props
757 .column_dictionary_page_size_limit(self.descr.path())
758 }
759 None => false,
760 }
761 }
762
763 #[inline]
765 fn should_add_data_page(&self) -> bool {
766 if self.page_metrics.num_buffered_values == 0 {
771 return false;
772 }
773
774 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
775 || self.encoder.estimated_data_page_size()
776 >= self.props.column_data_page_size_limit(self.descr.path())
777 }
778
779 fn dict_fallback(&mut self) -> Result<()> {
782 if self.page_metrics.num_buffered_values > 0 {
784 self.add_data_page()?;
785 }
786 self.write_dictionary_page()?;
787 self.flush_data_pages()?;
788 Ok(())
789 }
790
791 fn update_column_offset_index(
793 &mut self,
794 page_statistics: Option<&ValueStatistics<E::T>>,
795 page_variable_length_bytes: Option<i64>,
796 ) {
797 let null_page =
799 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
800 if null_page && self.column_index_builder.valid() {
803 self.column_index_builder.append(
804 null_page,
805 vec![],
806 vec![],
807 self.page_metrics.num_page_nulls as i64,
808 );
809 } else if self.column_index_builder.valid() {
810 match &page_statistics {
813 None => {
814 self.column_index_builder.to_invalid();
815 }
816 Some(stat) => {
817 let new_min = stat.min_opt().unwrap();
819 let new_max = stat.max_opt().unwrap();
820 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
821 if self.data_page_boundary_ascending {
822 let not_ascending = compare_greater(&self.descr, last_min, new_min)
824 || compare_greater(&self.descr, last_max, new_max);
825 if not_ascending {
826 self.data_page_boundary_ascending = false;
827 }
828 }
829
830 if self.data_page_boundary_descending {
831 let not_descending = compare_greater(&self.descr, new_min, last_min)
833 || compare_greater(&self.descr, new_max, last_max);
834 if not_descending {
835 self.data_page_boundary_descending = false;
836 }
837 }
838 }
839 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
840
841 if self.can_truncate_value() {
842 self.column_index_builder.append(
843 null_page,
844 self.truncate_min_value(
845 self.props.column_index_truncate_length(),
846 stat.min_bytes_opt().unwrap(),
847 )
848 .0,
849 self.truncate_max_value(
850 self.props.column_index_truncate_length(),
851 stat.max_bytes_opt().unwrap(),
852 )
853 .0,
854 self.page_metrics.num_page_nulls as i64,
855 );
856 } else {
857 self.column_index_builder.append(
858 null_page,
859 stat.min_bytes_opt().unwrap().to_vec(),
860 stat.max_bytes_opt().unwrap().to_vec(),
861 self.page_metrics.num_page_nulls as i64,
862 );
863 }
864 }
865 }
866 }
867
868 self.column_index_builder.append_histograms(
870 &self.page_metrics.repetition_level_histogram,
871 &self.page_metrics.definition_level_histogram,
872 );
873
874 if let Some(builder) = self.offset_index_builder.as_mut() {
876 builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
877 builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
878 }
879 }
880
881 fn can_truncate_value(&self) -> bool {
883 match self.descr.physical_type() {
884 Type::FIXED_LEN_BYTE_ARRAY
888 if !matches!(
889 self.descr.logical_type_ref(),
890 Some(&LogicalType::Decimal { .. }) | Some(&LogicalType::Float16)
891 ) =>
892 {
893 true
894 }
895 Type::BYTE_ARRAY => true,
896 _ => false,
898 }
899 }
900
901 fn is_utf8(&self) -> bool {
903 self.get_descriptor().logical_type_ref() == Some(&LogicalType::String)
904 || self.get_descriptor().converted_type() == ConvertedType::UTF8
905 }
906
907 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
918 truncation_length
919 .filter(|l| data.len() > *l)
920 .and_then(|l|
921 if self.is_utf8() {
923 match str::from_utf8(data) {
924 Ok(str_data) => truncate_utf8(str_data, l),
925 Err(_) => Some(data[..l].to_vec()),
926 }
927 } else {
928 Some(data[..l].to_vec())
929 }
930 )
931 .map(|truncated| (truncated, true))
932 .unwrap_or_else(|| (data.to_vec(), false))
933 }
934
935 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
949 truncation_length
950 .filter(|l| data.len() > *l)
951 .and_then(|l|
952 if self.is_utf8() {
954 match str::from_utf8(data) {
955 Ok(str_data) => truncate_and_increment_utf8(str_data, l),
956 Err(_) => increment(data[..l].to_vec()),
957 }
958 } else {
959 increment(data[..l].to_vec())
960 }
961 )
962 .map(|truncated| (truncated, true))
963 .unwrap_or_else(|| (data.to_vec(), false))
964 }
965
966 fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
969 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
970 match statistics {
971 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
972 let (min, did_truncate_min) = self.truncate_min_value(
973 self.props.statistics_truncate_length(),
974 stats.min_bytes_opt().unwrap(),
975 );
976 let (max, did_truncate_max) = self.truncate_max_value(
977 self.props.statistics_truncate_length(),
978 stats.max_bytes_opt().unwrap(),
979 );
980 Statistics::ByteArray(
981 ValueStatistics::new(
982 Some(min.into()),
983 Some(max.into()),
984 stats.distinct_count(),
985 stats.null_count_opt(),
986 backwards_compatible_min_max,
987 )
988 .with_max_is_exact(!did_truncate_max)
989 .with_min_is_exact(!did_truncate_min),
990 )
991 }
992 Statistics::FixedLenByteArray(stats)
993 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
994 {
995 let (min, did_truncate_min) = self.truncate_min_value(
996 self.props.statistics_truncate_length(),
997 stats.min_bytes_opt().unwrap(),
998 );
999 let (max, did_truncate_max) = self.truncate_max_value(
1000 self.props.statistics_truncate_length(),
1001 stats.max_bytes_opt().unwrap(),
1002 );
1003 Statistics::FixedLenByteArray(
1004 ValueStatistics::new(
1005 Some(min.into()),
1006 Some(max.into()),
1007 stats.distinct_count(),
1008 stats.null_count_opt(),
1009 backwards_compatible_min_max,
1010 )
1011 .with_max_is_exact(!did_truncate_max)
1012 .with_min_is_exact(!did_truncate_min),
1013 )
1014 }
1015 stats => stats,
1016 }
1017 }
1018
1019 pub(crate) fn add_data_page(&mut self) -> Result<()> {
1022 let values_data = self.encoder.flush_data_page()?;
1024
1025 let max_def_level = self.descr.max_def_level();
1026 let max_rep_level = self.descr.max_rep_level();
1027
1028 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1029
1030 let page_statistics = match (values_data.min_value, values_data.max_value) {
1031 (Some(min), Some(max)) => {
1032 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1034 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1035
1036 (self.statistics_enabled == EnabledStatistics::Page).then_some(
1037 ValueStatistics::new(
1038 Some(min),
1039 Some(max),
1040 None,
1041 Some(self.page_metrics.num_page_nulls),
1042 false,
1043 ),
1044 )
1045 }
1046 _ => None,
1047 };
1048
1049 self.update_column_offset_index(
1051 page_statistics.as_ref(),
1052 values_data.variable_length_bytes,
1053 );
1054
1055 self.column_metrics
1057 .update_from_page_metrics(&self.page_metrics);
1058 self.column_metrics
1059 .update_variable_length_bytes(values_data.variable_length_bytes);
1060
1061 let page_statistics = page_statistics
1063 .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1064 .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1065
1066 let compressed_page = match self.props.writer_version() {
1067 WriterVersion::PARQUET_1_0 => {
1068 let mut buffer = vec![];
1069
1070 if max_rep_level > 0 {
1071 self.rep_levels_encoder
1072 .flush_to(|data| buffer.extend_from_slice(data));
1073 }
1074
1075 if max_def_level > 0 {
1076 self.def_levels_encoder
1077 .flush_to(|data| buffer.extend_from_slice(data));
1078 }
1079
1080 buffer.extend_from_slice(&values_data.buf);
1081 let uncompressed_size = buffer.len();
1082
1083 if let Some(ref mut cmpr) = self.compressor {
1084 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1085 cmpr.compress(&buffer[..], &mut compressed_buf)?;
1086 compressed_buf.shrink_to_fit();
1087 buffer = compressed_buf;
1088 }
1089
1090 let data_page = Page::DataPage {
1091 buf: buffer.into(),
1092 num_values: self.page_metrics.num_buffered_values,
1093 encoding: values_data.encoding,
1094 def_level_encoding: Encoding::RLE,
1095 rep_level_encoding: Encoding::RLE,
1096 statistics: page_statistics,
1097 };
1098
1099 CompressedPage::new(data_page, uncompressed_size)
1100 }
1101 WriterVersion::PARQUET_2_0 => {
1102 let mut rep_levels_byte_len = 0;
1103 let mut def_levels_byte_len = 0;
1104 let mut buffer = vec![];
1105
1106 if max_rep_level > 0 {
1107 self.rep_levels_encoder
1108 .flush_to(|data| buffer.extend_from_slice(data));
1109 rep_levels_byte_len = buffer.len();
1110 }
1111
1112 if max_def_level > 0 {
1113 self.def_levels_encoder
1114 .flush_to(|data| buffer.extend_from_slice(data));
1115 def_levels_byte_len = buffer.len() - rep_levels_byte_len;
1116 }
1117
1118 let uncompressed_size =
1119 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1120
1121 let is_compressed = match self.compressor {
1123 Some(ref mut cmpr) => {
1124 let buffer_len = buffer.len();
1125 cmpr.compress(&values_data.buf, &mut buffer)?;
1126 if uncompressed_size <= buffer.len() - buffer_len {
1127 buffer.truncate(buffer_len);
1128 buffer.extend_from_slice(&values_data.buf);
1129 false
1130 } else {
1131 true
1132 }
1133 }
1134 None => {
1135 buffer.extend_from_slice(&values_data.buf);
1136 false
1137 }
1138 };
1139
1140 let data_page = Page::DataPageV2 {
1141 buf: buffer.into(),
1142 num_values: self.page_metrics.num_buffered_values,
1143 encoding: values_data.encoding,
1144 num_nulls: self.page_metrics.num_page_nulls as u32,
1145 num_rows: self.page_metrics.num_buffered_rows,
1146 def_levels_byte_len: def_levels_byte_len as u32,
1147 rep_levels_byte_len: rep_levels_byte_len as u32,
1148 is_compressed,
1149 statistics: page_statistics,
1150 };
1151
1152 CompressedPage::new(data_page, uncompressed_size)
1153 }
1154 };
1155
1156 if self.encoder.has_dictionary() {
1158 self.data_pages.push_back(compressed_page);
1159 } else {
1160 self.write_data_page(compressed_page)?;
1161 }
1162
1163 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1165 self.page_metrics.new_page();
1166
1167 Ok(())
1168 }
1169
1170 #[inline]
1173 fn flush_data_pages(&mut self) -> Result<()> {
1174 if self.page_metrics.num_buffered_values > 0 {
1176 self.add_data_page()?;
1177 }
1178
1179 while let Some(page) = self.data_pages.pop_front() {
1180 self.write_data_page(page)?;
1181 }
1182
1183 Ok(())
1184 }
1185
1186 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1188 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1189 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1190 let num_values = self.column_metrics.total_num_values as i64;
1191 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1192 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1194
1195 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1196 .set_compression(self.codec)
1197 .set_encodings_mask(EncodingMask::new_from_encodings(self.encodings.iter()))
1198 .set_page_encoding_stats(self.encoding_stats.clone())
1199 .set_total_compressed_size(total_compressed_size)
1200 .set_total_uncompressed_size(total_uncompressed_size)
1201 .set_num_values(num_values)
1202 .set_data_page_offset(data_page_offset)
1203 .set_dictionary_page_offset(dict_page_offset);
1204
1205 if self.statistics_enabled != EnabledStatistics::None {
1206 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1207
1208 let statistics = ValueStatistics::<E::T>::new(
1209 self.column_metrics.min_column_value.clone(),
1210 self.column_metrics.max_column_value.clone(),
1211 self.column_metrics.column_distinct_count,
1212 Some(self.column_metrics.num_column_nulls),
1213 false,
1214 )
1215 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1216 .into();
1217
1218 let statistics = self.truncate_statistics(statistics);
1219
1220 builder = builder
1221 .set_statistics(statistics)
1222 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1223 .set_repetition_level_histogram(
1224 self.column_metrics.repetition_level_histogram.take(),
1225 )
1226 .set_definition_level_histogram(
1227 self.column_metrics.definition_level_histogram.take(),
1228 );
1229
1230 if let Some(geo_stats) = self.encoder.flush_geospatial_statistics() {
1231 builder = builder.set_geo_statistics(geo_stats);
1232 }
1233 }
1234
1235 builder = self.set_column_chunk_encryption_properties(builder);
1236
1237 let metadata = builder.build()?;
1238 Ok(metadata)
1239 }
1240
1241 #[inline]
1243 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1244 self.encodings.insert(page.encoding());
1245 match self.encoding_stats.last_mut() {
1246 Some(encoding_stats)
1247 if encoding_stats.page_type == page.page_type()
1248 && encoding_stats.encoding == page.encoding() =>
1249 {
1250 encoding_stats.count += 1;
1251 }
1252 _ => {
1253 self.encoding_stats.push(PageEncodingStats {
1256 page_type: page.page_type(),
1257 encoding: page.encoding(),
1258 count: 1,
1259 });
1260 }
1261 }
1262 let page_spec = self.page_writer.write_page(page)?;
1263 if let Some(builder) = self.offset_index_builder.as_mut() {
1266 builder
1267 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1268 }
1269 self.update_metrics_for_page(page_spec);
1270 Ok(())
1271 }
1272
1273 #[inline]
1275 fn write_dictionary_page(&mut self) -> Result<()> {
1276 let compressed_page = {
1277 let mut page = self
1278 .encoder
1279 .flush_dict_page()?
1280 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1281
1282 let uncompressed_size = page.buf.len();
1283
1284 if let Some(ref mut cmpr) = self.compressor {
1285 let mut output_buf = Vec::with_capacity(uncompressed_size);
1286 cmpr.compress(&page.buf, &mut output_buf)?;
1287 page.buf = Bytes::from(output_buf);
1288 }
1289
1290 let dict_page = Page::DictionaryPage {
1291 buf: page.buf,
1292 num_values: page.num_values as u32,
1293 encoding: self.props.dictionary_page_encoding(),
1294 is_sorted: page.is_sorted,
1295 };
1296 CompressedPage::new(dict_page, uncompressed_size)
1297 };
1298
1299 self.encodings.insert(compressed_page.encoding());
1300 self.encoding_stats.push(PageEncodingStats {
1301 page_type: PageType::DICTIONARY_PAGE,
1302 encoding: compressed_page.encoding(),
1303 count: 1,
1304 });
1305 let page_spec = self.page_writer.write_page(compressed_page)?;
1306 self.update_metrics_for_page(page_spec);
1307 Ok(())
1309 }
1310
1311 #[inline]
1313 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1314 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1315 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1316 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1317
1318 match page_spec.page_type {
1319 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1320 self.column_metrics.total_num_values += page_spec.num_values as u64;
1321 if self.column_metrics.data_page_offset.is_none() {
1322 self.column_metrics.data_page_offset = Some(page_spec.offset);
1323 }
1324 }
1325 PageType::DICTIONARY_PAGE => {
1326 assert!(
1327 self.column_metrics.dictionary_page_offset.is_none(),
1328 "Dictionary offset is already set"
1329 );
1330 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1331 }
1332 _ => {}
1333 }
1334 }
1335
1336 #[inline]
1337 #[cfg(feature = "encryption")]
1338 fn set_column_chunk_encryption_properties(
1339 &self,
1340 builder: ColumnChunkMetaDataBuilder,
1341 ) -> ColumnChunkMetaDataBuilder {
1342 if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1343 builder.set_column_crypto_metadata(get_column_crypto_metadata(
1344 encryption_properties,
1345 &self.descr,
1346 ))
1347 } else {
1348 builder
1349 }
1350 }
1351
1352 #[inline]
1353 #[cfg(not(feature = "encryption"))]
1354 fn set_column_chunk_encryption_properties(
1355 &self,
1356 builder: ColumnChunkMetaDataBuilder,
1357 ) -> ColumnChunkMetaDataBuilder {
1358 builder
1359 }
1360}
1361
1362fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1363 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1364}
1365
1366fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1367 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1368}
1369
1370#[inline]
1371#[allow(clippy::eq_op)]
1372fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1373 match T::PHYSICAL_TYPE {
1374 Type::FLOAT | Type::DOUBLE => val != val,
1375 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type_ref() == Some(&LogicalType::Float16) => {
1376 let val = val.as_bytes();
1377 let val = f16::from_le_bytes([val[0], val[1]]);
1378 val.is_nan()
1379 }
1380 _ => false,
1381 }
1382}
1383
1384fn update_stat<T: ParquetValueType, F>(
1389 descr: &ColumnDescriptor,
1390 val: &T,
1391 cur: &mut Option<T>,
1392 should_update: F,
1393) where
1394 F: Fn(&T) -> bool,
1395{
1396 if is_nan(descr, val) {
1397 return;
1398 }
1399
1400 if cur.as_ref().is_none_or(should_update) {
1401 *cur = Some(val.clone());
1402 }
1403}
1404
1405fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1407 match T::PHYSICAL_TYPE {
1408 Type::INT32 | Type::INT64 => {
1409 if let Some(LogicalType::Integer {
1410 is_signed: false, ..
1411 }) = descr.logical_type_ref()
1412 {
1413 return compare_greater_unsigned_int(a, b);
1415 }
1416
1417 match descr.converted_type() {
1418 ConvertedType::UINT_8
1419 | ConvertedType::UINT_16
1420 | ConvertedType::UINT_32
1421 | ConvertedType::UINT_64 => {
1422 return compare_greater_unsigned_int(a, b);
1423 }
1424 _ => {}
1425 };
1426 }
1427 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1428 if let Some(LogicalType::Decimal { .. }) = descr.logical_type_ref() {
1429 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1430 }
1431 if let ConvertedType::DECIMAL = descr.converted_type() {
1432 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1433 }
1434 if let Some(LogicalType::Float16) = descr.logical_type_ref() {
1435 return compare_greater_f16(a.as_bytes(), b.as_bytes());
1436 }
1437 }
1438
1439 _ => {}
1440 }
1441
1442 a > b
1444}
1445
1446fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1454 match (kind, props.writer_version()) {
1455 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1456 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1457 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1458 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1459 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1460 _ => Encoding::PLAIN,
1461 }
1462}
1463
1464fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1466 match (kind, props.writer_version()) {
1467 (Type::BOOLEAN, _) => false,
1469 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1471 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1472 _ => true,
1473 }
1474}
1475
1476#[inline]
1477fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1478 a.as_u64().unwrap() > b.as_u64().unwrap()
1479}
1480
1481#[inline]
1482fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1483 let a = f16::from_le_bytes(a.try_into().unwrap());
1484 let b = f16::from_le_bytes(b.try_into().unwrap());
1485 a > b
1486}
1487
1488fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1490 let a_length = a.len();
1491 let b_length = b.len();
1492
1493 if a_length == 0 || b_length == 0 {
1494 return a_length > 0;
1495 }
1496
1497 let first_a: u8 = a[0];
1498 let first_b: u8 = b[0];
1499
1500 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1505 return (first_a as i8) > (first_b as i8);
1506 }
1507
1508 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1514
1515 if a_length != b_length {
1516 let not_equal = if a_length > b_length {
1517 let lead_length = a_length - b_length;
1518 a[0..lead_length].iter().any(|&x| x != extension)
1519 } else {
1520 let lead_length = b_length - a_length;
1521 b[0..lead_length].iter().any(|&x| x != extension)
1522 };
1523
1524 if not_equal {
1525 let negative_values: bool = (first_a as i8) < 0;
1526 let a_longer: bool = a_length > b_length;
1527 return if negative_values { !a_longer } else { a_longer };
1528 }
1529 }
1530
1531 (a[1..]) > (b[1..])
1532}
1533
1534fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1540 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1541 Some(data.as_bytes()[..split].to_vec())
1542}
1543
1544fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1550 let lower_bound = length.saturating_sub(3);
1552 let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1553 increment_utf8(data.get(..split)?)
1554}
1555
1556fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1563 for (idx, original_char) in data.char_indices().rev() {
1564 let original_len = original_char.len_utf8();
1565 if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1566 if next_char.len_utf8() == original_len {
1568 let mut result = data.as_bytes()[..idx + original_len].to_vec();
1569 next_char.encode_utf8(&mut result[idx..]);
1570 return Some(result);
1571 }
1572 }
1573 }
1574
1575 None
1576}
1577
1578fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1582 for byte in data.iter_mut().rev() {
1583 let (incremented, overflow) = byte.overflowing_add(1);
1584 *byte = incremented;
1585
1586 if !overflow {
1587 return Some(data);
1588 }
1589 }
1590
1591 None
1592}
1593
1594#[cfg(test)]
1595mod tests {
1596 use crate::{
1597 file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1598 schema::parser::parse_message_type,
1599 };
1600 use core::str;
1601 use rand::distr::uniform::SampleUniform;
1602 use std::{fs::File, sync::Arc};
1603
1604 use crate::column::{
1605 page::PageReader,
1606 reader::{ColumnReaderImpl, get_column_reader, get_typed_column_reader},
1607 };
1608 use crate::file::writer::TrackedWrite;
1609 use crate::file::{
1610 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1611 };
1612 use crate::schema::types::{ColumnPath, Type as SchemaType};
1613 use crate::util::test_common::rand_gen::random_numbers_range;
1614
1615 use super::*;
1616
1617 #[test]
1618 fn test_column_writer_inconsistent_def_rep_length() {
1619 let page_writer = get_test_page_writer();
1620 let props = Default::default();
1621 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1622 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1623 assert!(res.is_err());
1624 if let Err(err) = res {
1625 assert_eq!(
1626 format!("{err}"),
1627 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1628 );
1629 }
1630 }
1631
1632 #[test]
1633 fn test_column_writer_invalid_def_levels() {
1634 let page_writer = get_test_page_writer();
1635 let props = Default::default();
1636 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1637 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1638 assert!(res.is_err());
1639 if let Err(err) = res {
1640 assert_eq!(
1641 format!("{err}"),
1642 "Parquet error: Definition levels are required, because max definition level = 1"
1643 );
1644 }
1645 }
1646
1647 #[test]
1648 fn test_column_writer_invalid_rep_levels() {
1649 let page_writer = get_test_page_writer();
1650 let props = Default::default();
1651 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1652 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1653 assert!(res.is_err());
1654 if let Err(err) = res {
1655 assert_eq!(
1656 format!("{err}"),
1657 "Parquet error: Repetition levels are required, because max repetition level = 1"
1658 );
1659 }
1660 }
1661
1662 #[test]
1663 fn test_column_writer_not_enough_values_to_write() {
1664 let page_writer = get_test_page_writer();
1665 let props = Default::default();
1666 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1667 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1668 assert!(res.is_err());
1669 if let Err(err) = res {
1670 assert_eq!(
1671 format!("{err}"),
1672 "Parquet error: Expected to write 4 values, but have only 2"
1673 );
1674 }
1675 }
1676
1677 #[test]
1678 fn test_column_writer_write_only_one_dictionary_page() {
1679 let page_writer = get_test_page_writer();
1680 let props = Default::default();
1681 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1682 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1683 writer.add_data_page().unwrap();
1685 writer.write_dictionary_page().unwrap();
1686 let err = writer.write_dictionary_page().unwrap_err().to_string();
1687 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1688 }
1689
1690 #[test]
1691 fn test_column_writer_error_when_writing_disabled_dictionary() {
1692 let page_writer = get_test_page_writer();
1693 let props = Arc::new(
1694 WriterProperties::builder()
1695 .set_dictionary_enabled(false)
1696 .build(),
1697 );
1698 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1699 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1700 let err = writer.write_dictionary_page().unwrap_err().to_string();
1701 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1702 }
1703
1704 #[test]
1705 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1706 let page_writer = get_test_page_writer();
1707 let props = Arc::new(
1708 WriterProperties::builder()
1709 .set_dictionary_enabled(true)
1710 .build(),
1711 );
1712 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1713 writer
1714 .write_batch(&[true, false, true, false], None, None)
1715 .unwrap();
1716
1717 let r = writer.close().unwrap();
1718 assert_eq!(r.bytes_written, 1);
1721 assert_eq!(r.rows_written, 4);
1722
1723 let metadata = r.metadata;
1724 assert_eq!(
1725 metadata.encodings().collect::<Vec<_>>(),
1726 vec![Encoding::PLAIN, Encoding::RLE]
1727 );
1728 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
1730 }
1731
1732 #[test]
1733 fn test_column_writer_default_encoding_support_bool() {
1734 check_encoding_write_support::<BoolType>(
1735 WriterVersion::PARQUET_1_0,
1736 true,
1737 &[true, false],
1738 None,
1739 &[Encoding::PLAIN, Encoding::RLE],
1740 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1741 );
1742 check_encoding_write_support::<BoolType>(
1743 WriterVersion::PARQUET_1_0,
1744 false,
1745 &[true, false],
1746 None,
1747 &[Encoding::PLAIN, Encoding::RLE],
1748 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1749 );
1750 check_encoding_write_support::<BoolType>(
1751 WriterVersion::PARQUET_2_0,
1752 true,
1753 &[true, false],
1754 None,
1755 &[Encoding::RLE],
1756 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1757 );
1758 check_encoding_write_support::<BoolType>(
1759 WriterVersion::PARQUET_2_0,
1760 false,
1761 &[true, false],
1762 None,
1763 &[Encoding::RLE],
1764 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1765 );
1766 }
1767
1768 #[test]
1769 fn test_column_writer_default_encoding_support_int32() {
1770 check_encoding_write_support::<Int32Type>(
1771 WriterVersion::PARQUET_1_0,
1772 true,
1773 &[1, 2],
1774 Some(0),
1775 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1776 &[
1777 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1778 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1779 ],
1780 );
1781 check_encoding_write_support::<Int32Type>(
1782 WriterVersion::PARQUET_1_0,
1783 false,
1784 &[1, 2],
1785 None,
1786 &[Encoding::PLAIN, Encoding::RLE],
1787 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1788 );
1789 check_encoding_write_support::<Int32Type>(
1790 WriterVersion::PARQUET_2_0,
1791 true,
1792 &[1, 2],
1793 Some(0),
1794 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1795 &[
1796 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1797 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1798 ],
1799 );
1800 check_encoding_write_support::<Int32Type>(
1801 WriterVersion::PARQUET_2_0,
1802 false,
1803 &[1, 2],
1804 None,
1805 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1806 &[encoding_stats(
1807 PageType::DATA_PAGE_V2,
1808 Encoding::DELTA_BINARY_PACKED,
1809 1,
1810 )],
1811 );
1812 }
1813
1814 #[test]
1815 fn test_column_writer_default_encoding_support_int64() {
1816 check_encoding_write_support::<Int64Type>(
1817 WriterVersion::PARQUET_1_0,
1818 true,
1819 &[1, 2],
1820 Some(0),
1821 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1822 &[
1823 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1824 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1825 ],
1826 );
1827 check_encoding_write_support::<Int64Type>(
1828 WriterVersion::PARQUET_1_0,
1829 false,
1830 &[1, 2],
1831 None,
1832 &[Encoding::PLAIN, Encoding::RLE],
1833 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1834 );
1835 check_encoding_write_support::<Int64Type>(
1836 WriterVersion::PARQUET_2_0,
1837 true,
1838 &[1, 2],
1839 Some(0),
1840 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1841 &[
1842 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1843 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1844 ],
1845 );
1846 check_encoding_write_support::<Int64Type>(
1847 WriterVersion::PARQUET_2_0,
1848 false,
1849 &[1, 2],
1850 None,
1851 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1852 &[encoding_stats(
1853 PageType::DATA_PAGE_V2,
1854 Encoding::DELTA_BINARY_PACKED,
1855 1,
1856 )],
1857 );
1858 }
1859
1860 #[test]
1861 fn test_column_writer_default_encoding_support_int96() {
1862 check_encoding_write_support::<Int96Type>(
1863 WriterVersion::PARQUET_1_0,
1864 true,
1865 &[Int96::from(vec![1, 2, 3])],
1866 Some(0),
1867 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1868 &[
1869 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1870 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1871 ],
1872 );
1873 check_encoding_write_support::<Int96Type>(
1874 WriterVersion::PARQUET_1_0,
1875 false,
1876 &[Int96::from(vec![1, 2, 3])],
1877 None,
1878 &[Encoding::PLAIN, Encoding::RLE],
1879 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1880 );
1881 check_encoding_write_support::<Int96Type>(
1882 WriterVersion::PARQUET_2_0,
1883 true,
1884 &[Int96::from(vec![1, 2, 3])],
1885 Some(0),
1886 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1887 &[
1888 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1889 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1890 ],
1891 );
1892 check_encoding_write_support::<Int96Type>(
1893 WriterVersion::PARQUET_2_0,
1894 false,
1895 &[Int96::from(vec![1, 2, 3])],
1896 None,
1897 &[Encoding::PLAIN, Encoding::RLE],
1898 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1899 );
1900 }
1901
1902 #[test]
1903 fn test_column_writer_default_encoding_support_float() {
1904 check_encoding_write_support::<FloatType>(
1905 WriterVersion::PARQUET_1_0,
1906 true,
1907 &[1.0, 2.0],
1908 Some(0),
1909 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1910 &[
1911 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1912 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1913 ],
1914 );
1915 check_encoding_write_support::<FloatType>(
1916 WriterVersion::PARQUET_1_0,
1917 false,
1918 &[1.0, 2.0],
1919 None,
1920 &[Encoding::PLAIN, Encoding::RLE],
1921 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1922 );
1923 check_encoding_write_support::<FloatType>(
1924 WriterVersion::PARQUET_2_0,
1925 true,
1926 &[1.0, 2.0],
1927 Some(0),
1928 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1929 &[
1930 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1931 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1932 ],
1933 );
1934 check_encoding_write_support::<FloatType>(
1935 WriterVersion::PARQUET_2_0,
1936 false,
1937 &[1.0, 2.0],
1938 None,
1939 &[Encoding::PLAIN, Encoding::RLE],
1940 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1941 );
1942 }
1943
1944 #[test]
1945 fn test_column_writer_default_encoding_support_double() {
1946 check_encoding_write_support::<DoubleType>(
1947 WriterVersion::PARQUET_1_0,
1948 true,
1949 &[1.0, 2.0],
1950 Some(0),
1951 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1952 &[
1953 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1954 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1955 ],
1956 );
1957 check_encoding_write_support::<DoubleType>(
1958 WriterVersion::PARQUET_1_0,
1959 false,
1960 &[1.0, 2.0],
1961 None,
1962 &[Encoding::PLAIN, Encoding::RLE],
1963 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1964 );
1965 check_encoding_write_support::<DoubleType>(
1966 WriterVersion::PARQUET_2_0,
1967 true,
1968 &[1.0, 2.0],
1969 Some(0),
1970 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1971 &[
1972 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1973 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1974 ],
1975 );
1976 check_encoding_write_support::<DoubleType>(
1977 WriterVersion::PARQUET_2_0,
1978 false,
1979 &[1.0, 2.0],
1980 None,
1981 &[Encoding::PLAIN, Encoding::RLE],
1982 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1983 );
1984 }
1985
1986 #[test]
1987 fn test_column_writer_default_encoding_support_byte_array() {
1988 check_encoding_write_support::<ByteArrayType>(
1989 WriterVersion::PARQUET_1_0,
1990 true,
1991 &[ByteArray::from(vec![1u8])],
1992 Some(0),
1993 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1994 &[
1995 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1996 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1997 ],
1998 );
1999 check_encoding_write_support::<ByteArrayType>(
2000 WriterVersion::PARQUET_1_0,
2001 false,
2002 &[ByteArray::from(vec![1u8])],
2003 None,
2004 &[Encoding::PLAIN, Encoding::RLE],
2005 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2006 );
2007 check_encoding_write_support::<ByteArrayType>(
2008 WriterVersion::PARQUET_2_0,
2009 true,
2010 &[ByteArray::from(vec![1u8])],
2011 Some(0),
2012 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2013 &[
2014 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2015 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2016 ],
2017 );
2018 check_encoding_write_support::<ByteArrayType>(
2019 WriterVersion::PARQUET_2_0,
2020 false,
2021 &[ByteArray::from(vec![1u8])],
2022 None,
2023 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2024 &[encoding_stats(
2025 PageType::DATA_PAGE_V2,
2026 Encoding::DELTA_BYTE_ARRAY,
2027 1,
2028 )],
2029 );
2030 }
2031
2032 #[test]
2033 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2034 check_encoding_write_support::<FixedLenByteArrayType>(
2035 WriterVersion::PARQUET_1_0,
2036 true,
2037 &[ByteArray::from(vec![1u8]).into()],
2038 None,
2039 &[Encoding::PLAIN, Encoding::RLE],
2040 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2041 );
2042 check_encoding_write_support::<FixedLenByteArrayType>(
2043 WriterVersion::PARQUET_1_0,
2044 false,
2045 &[ByteArray::from(vec![1u8]).into()],
2046 None,
2047 &[Encoding::PLAIN, Encoding::RLE],
2048 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2049 );
2050 check_encoding_write_support::<FixedLenByteArrayType>(
2051 WriterVersion::PARQUET_2_0,
2052 true,
2053 &[ByteArray::from(vec![1u8]).into()],
2054 Some(0),
2055 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2056 &[
2057 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2058 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2059 ],
2060 );
2061 check_encoding_write_support::<FixedLenByteArrayType>(
2062 WriterVersion::PARQUET_2_0,
2063 false,
2064 &[ByteArray::from(vec![1u8]).into()],
2065 None,
2066 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2067 &[encoding_stats(
2068 PageType::DATA_PAGE_V2,
2069 Encoding::DELTA_BYTE_ARRAY,
2070 1,
2071 )],
2072 );
2073 }
2074
2075 #[test]
2076 fn test_column_writer_check_metadata() {
2077 let page_writer = get_test_page_writer();
2078 let props = Default::default();
2079 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2080 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2081
2082 let r = writer.close().unwrap();
2083 assert_eq!(r.bytes_written, 20);
2084 assert_eq!(r.rows_written, 4);
2085
2086 let metadata = r.metadata;
2087 assert_eq!(
2088 metadata.encodings().collect::<Vec<_>>(),
2089 vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2090 );
2091 assert_eq!(metadata.num_values(), 4);
2092 assert_eq!(metadata.compressed_size(), 20);
2093 assert_eq!(metadata.uncompressed_size(), 20);
2094 assert_eq!(metadata.data_page_offset(), 0);
2095 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2096 if let Some(stats) = metadata.statistics() {
2097 assert_eq!(stats.null_count_opt(), Some(0));
2098 assert_eq!(stats.distinct_count_opt(), None);
2099 if let Statistics::Int32(stats) = stats {
2100 assert_eq!(stats.min_opt().unwrap(), &1);
2101 assert_eq!(stats.max_opt().unwrap(), &4);
2102 } else {
2103 panic!("expecting Statistics::Int32");
2104 }
2105 } else {
2106 panic!("metadata missing statistics");
2107 }
2108 }
2109
2110 #[test]
2111 fn test_column_writer_check_byte_array_min_max() {
2112 let page_writer = get_test_page_writer();
2113 let props = Default::default();
2114 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2115 writer
2116 .write_batch(
2117 &[
2118 ByteArray::from(vec![
2119 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2120 35u8, 231u8, 90u8, 0u8, 0u8,
2121 ]),
2122 ByteArray::from(vec![
2123 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2124 152u8, 177u8, 56u8, 0u8, 0u8,
2125 ]),
2126 ByteArray::from(vec![
2127 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2128 0u8,
2129 ]),
2130 ByteArray::from(vec![
2131 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2132 44u8, 0u8, 0u8,
2133 ]),
2134 ],
2135 None,
2136 None,
2137 )
2138 .unwrap();
2139 let metadata = writer.close().unwrap().metadata;
2140 if let Some(stats) = metadata.statistics() {
2141 if let Statistics::ByteArray(stats) = stats {
2142 assert_eq!(
2143 stats.min_opt().unwrap(),
2144 &ByteArray::from(vec![
2145 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2146 35u8, 231u8, 90u8, 0u8, 0u8,
2147 ])
2148 );
2149 assert_eq!(
2150 stats.max_opt().unwrap(),
2151 &ByteArray::from(vec![
2152 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2153 44u8, 0u8, 0u8,
2154 ])
2155 );
2156 } else {
2157 panic!("expecting Statistics::ByteArray");
2158 }
2159 } else {
2160 panic!("metadata missing statistics");
2161 }
2162 }
2163
2164 #[test]
2165 fn test_column_writer_uint32_converted_type_min_max() {
2166 let page_writer = get_test_page_writer();
2167 let props = Default::default();
2168 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2169 page_writer,
2170 0,
2171 0,
2172 props,
2173 );
2174 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2175 let metadata = writer.close().unwrap().metadata;
2176 if let Some(stats) = metadata.statistics() {
2177 if let Statistics::Int32(stats) = stats {
2178 assert_eq!(stats.min_opt().unwrap(), &0,);
2179 assert_eq!(stats.max_opt().unwrap(), &5,);
2180 } else {
2181 panic!("expecting Statistics::Int32");
2182 }
2183 } else {
2184 panic!("metadata missing statistics");
2185 }
2186 }
2187
2188 #[test]
2189 fn test_column_writer_precalculated_statistics() {
2190 let page_writer = get_test_page_writer();
2191 let props = Arc::new(
2192 WriterProperties::builder()
2193 .set_statistics_enabled(EnabledStatistics::Chunk)
2194 .build(),
2195 );
2196 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2197 writer
2198 .write_batch_with_statistics(
2199 &[1, 2, 3, 4],
2200 None,
2201 None,
2202 Some(&-17),
2203 Some(&9000),
2204 Some(55),
2205 )
2206 .unwrap();
2207
2208 let r = writer.close().unwrap();
2209 assert_eq!(r.bytes_written, 20);
2210 assert_eq!(r.rows_written, 4);
2211
2212 let metadata = r.metadata;
2213 assert_eq!(
2214 metadata.encodings().collect::<Vec<_>>(),
2215 vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2216 );
2217 assert_eq!(metadata.num_values(), 4);
2218 assert_eq!(metadata.compressed_size(), 20);
2219 assert_eq!(metadata.uncompressed_size(), 20);
2220 assert_eq!(metadata.data_page_offset(), 0);
2221 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2222 if let Some(stats) = metadata.statistics() {
2223 assert_eq!(stats.null_count_opt(), Some(0));
2224 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2225 if let Statistics::Int32(stats) = stats {
2226 assert_eq!(stats.min_opt().unwrap(), &-17);
2227 assert_eq!(stats.max_opt().unwrap(), &9000);
2228 } else {
2229 panic!("expecting Statistics::Int32");
2230 }
2231 } else {
2232 panic!("metadata missing statistics");
2233 }
2234 }
2235
2236 #[test]
2237 fn test_mixed_precomputed_statistics() {
2238 let mut buf = Vec::with_capacity(100);
2239 let mut write = TrackedWrite::new(&mut buf);
2240 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2241 let props = Arc::new(
2242 WriterProperties::builder()
2243 .set_write_page_header_statistics(true)
2244 .build(),
2245 );
2246 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2247
2248 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2249 writer
2250 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2251 .unwrap();
2252
2253 let r = writer.close().unwrap();
2254
2255 let stats = r.metadata.statistics().unwrap();
2256 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2257 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2258 assert_eq!(stats.null_count_opt(), Some(0));
2259 assert!(stats.distinct_count_opt().is_none());
2260
2261 drop(write);
2262
2263 let props = ReaderProperties::builder()
2264 .set_backward_compatible_lz4(false)
2265 .set_read_page_statistics(true)
2266 .build();
2267 let reader = SerializedPageReader::new_with_properties(
2268 Arc::new(Bytes::from(buf)),
2269 &r.metadata,
2270 r.rows_written as usize,
2271 None,
2272 Arc::new(props),
2273 )
2274 .unwrap();
2275
2276 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2277 assert_eq!(pages.len(), 2);
2278
2279 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2280 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2281
2282 let page_statistics = pages[1].statistics().unwrap();
2283 assert_eq!(
2284 page_statistics.min_bytes_opt().unwrap(),
2285 1_i32.to_le_bytes()
2286 );
2287 assert_eq!(
2288 page_statistics.max_bytes_opt().unwrap(),
2289 7_i32.to_le_bytes()
2290 );
2291 assert_eq!(page_statistics.null_count_opt(), Some(0));
2292 assert!(page_statistics.distinct_count_opt().is_none());
2293 }
2294
2295 #[test]
2296 fn test_disabled_statistics() {
2297 let mut buf = Vec::with_capacity(100);
2298 let mut write = TrackedWrite::new(&mut buf);
2299 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2300 let props = WriterProperties::builder()
2301 .set_statistics_enabled(EnabledStatistics::None)
2302 .set_writer_version(WriterVersion::PARQUET_2_0)
2303 .build();
2304 let props = Arc::new(props);
2305
2306 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2307 writer
2308 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2309 .unwrap();
2310
2311 let r = writer.close().unwrap();
2312 assert!(r.metadata.statistics().is_none());
2313
2314 drop(write);
2315
2316 let props = ReaderProperties::builder()
2317 .set_backward_compatible_lz4(false)
2318 .build();
2319 let reader = SerializedPageReader::new_with_properties(
2320 Arc::new(Bytes::from(buf)),
2321 &r.metadata,
2322 r.rows_written as usize,
2323 None,
2324 Arc::new(props),
2325 )
2326 .unwrap();
2327
2328 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2329 assert_eq!(pages.len(), 2);
2330
2331 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2332 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2333
2334 match &pages[1] {
2335 Page::DataPageV2 {
2336 num_values,
2337 num_nulls,
2338 num_rows,
2339 statistics,
2340 ..
2341 } => {
2342 assert_eq!(*num_values, 6);
2343 assert_eq!(*num_nulls, 2);
2344 assert_eq!(*num_rows, 6);
2345 assert!(statistics.is_none());
2346 }
2347 _ => unreachable!(),
2348 }
2349 }
2350
2351 #[test]
2352 fn test_column_writer_empty_column_roundtrip() {
2353 let props = Default::default();
2354 column_roundtrip::<Int32Type>(props, &[], None, None);
2355 }
2356
2357 #[test]
2358 fn test_column_writer_non_nullable_values_roundtrip() {
2359 let props = Default::default();
2360 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2361 }
2362
2363 #[test]
2364 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2365 let props = Default::default();
2366 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2367 }
2368
2369 #[test]
2370 fn test_column_writer_nullable_repeated_values_roundtrip() {
2371 let props = Default::default();
2372 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2373 }
2374
2375 #[test]
2376 fn test_column_writer_dictionary_fallback_small_data_page() {
2377 let props = WriterProperties::builder()
2378 .set_dictionary_page_size_limit(32)
2379 .set_data_page_size_limit(32)
2380 .build();
2381 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2382 }
2383
2384 #[test]
2385 fn test_column_writer_small_write_batch_size() {
2386 for i in &[1usize, 2, 5, 10, 11, 1023] {
2387 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2388
2389 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2390 }
2391 }
2392
2393 #[test]
2394 fn test_column_writer_dictionary_disabled_v1() {
2395 let props = WriterProperties::builder()
2396 .set_writer_version(WriterVersion::PARQUET_1_0)
2397 .set_dictionary_enabled(false)
2398 .build();
2399 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2400 }
2401
2402 #[test]
2403 fn test_column_writer_dictionary_disabled_v2() {
2404 let props = WriterProperties::builder()
2405 .set_writer_version(WriterVersion::PARQUET_2_0)
2406 .set_dictionary_enabled(false)
2407 .build();
2408 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2409 }
2410
2411 #[test]
2412 fn test_column_writer_compression_v1() {
2413 let props = WriterProperties::builder()
2414 .set_writer_version(WriterVersion::PARQUET_1_0)
2415 .set_compression(Compression::SNAPPY)
2416 .build();
2417 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2418 }
2419
2420 #[test]
2421 fn test_column_writer_compression_v2() {
2422 let props = WriterProperties::builder()
2423 .set_writer_version(WriterVersion::PARQUET_2_0)
2424 .set_compression(Compression::SNAPPY)
2425 .build();
2426 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2427 }
2428
2429 #[test]
2430 fn test_column_writer_add_data_pages_with_dict() {
2431 let mut file = tempfile::tempfile().unwrap();
2434 let mut write = TrackedWrite::new(&mut file);
2435 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2436 let props = Arc::new(
2437 WriterProperties::builder()
2438 .set_data_page_size_limit(10)
2439 .set_write_batch_size(3) .build(),
2441 );
2442 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2443 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2444 writer.write_batch(data, None, None).unwrap();
2445 let r = writer.close().unwrap();
2446
2447 drop(write);
2448
2449 let props = ReaderProperties::builder()
2451 .set_backward_compatible_lz4(false)
2452 .build();
2453 let mut page_reader = Box::new(
2454 SerializedPageReader::new_with_properties(
2455 Arc::new(file),
2456 &r.metadata,
2457 r.rows_written as usize,
2458 None,
2459 Arc::new(props),
2460 )
2461 .unwrap(),
2462 );
2463 let mut res = Vec::new();
2464 while let Some(page) = page_reader.get_next_page().unwrap() {
2465 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2466 }
2467 assert_eq!(
2468 res,
2469 vec![
2470 (PageType::DICTIONARY_PAGE, 10, 40),
2471 (PageType::DATA_PAGE, 9, 10),
2472 (PageType::DATA_PAGE, 1, 3),
2473 ]
2474 );
2475 assert_eq!(
2476 r.metadata.page_encoding_stats(),
2477 Some(&vec![
2478 PageEncodingStats {
2479 page_type: PageType::DICTIONARY_PAGE,
2480 encoding: Encoding::PLAIN,
2481 count: 1
2482 },
2483 PageEncodingStats {
2484 page_type: PageType::DATA_PAGE,
2485 encoding: Encoding::RLE_DICTIONARY,
2486 count: 2,
2487 }
2488 ])
2489 );
2490 }
2491
2492 #[test]
2493 fn test_column_writer_column_data_page_size_limit() {
2494 let props = Arc::new(
2495 WriterProperties::builder()
2496 .set_writer_version(WriterVersion::PARQUET_1_0)
2497 .set_dictionary_enabled(false)
2498 .set_data_page_size_limit(1000)
2499 .set_column_data_page_size_limit(ColumnPath::from("col"), 10)
2500 .set_write_batch_size(3)
2501 .build(),
2502 );
2503 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2504
2505 let col_values =
2506 write_and_collect_page_values(ColumnPath::from("col"), Arc::clone(&props), data);
2507 let other_values = write_and_collect_page_values(ColumnPath::from("other"), props, data);
2508
2509 assert_eq!(col_values, vec![3, 3, 3, 1]);
2510 assert_eq!(other_values, vec![10]);
2511 }
2512
2513 #[test]
2514 fn test_bool_statistics() {
2515 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2516 assert!(!stats.is_min_max_backwards_compatible());
2519 if let Statistics::Boolean(stats) = stats {
2520 assert_eq!(stats.min_opt().unwrap(), &false);
2521 assert_eq!(stats.max_opt().unwrap(), &true);
2522 } else {
2523 panic!("expecting Statistics::Boolean, got {stats:?}");
2524 }
2525 }
2526
2527 #[test]
2528 fn test_int32_statistics() {
2529 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2530 assert!(stats.is_min_max_backwards_compatible());
2531 if let Statistics::Int32(stats) = stats {
2532 assert_eq!(stats.min_opt().unwrap(), &-2);
2533 assert_eq!(stats.max_opt().unwrap(), &3);
2534 } else {
2535 panic!("expecting Statistics::Int32, got {stats:?}");
2536 }
2537 }
2538
2539 #[test]
2540 fn test_int64_statistics() {
2541 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2542 assert!(stats.is_min_max_backwards_compatible());
2543 if let Statistics::Int64(stats) = stats {
2544 assert_eq!(stats.min_opt().unwrap(), &-2);
2545 assert_eq!(stats.max_opt().unwrap(), &3);
2546 } else {
2547 panic!("expecting Statistics::Int64, got {stats:?}");
2548 }
2549 }
2550
2551 #[test]
2552 fn test_int96_statistics() {
2553 let input = vec![
2554 Int96::from(vec![1, 20, 30]),
2555 Int96::from(vec![3, 20, 10]),
2556 Int96::from(vec![0, 20, 30]),
2557 Int96::from(vec![2, 20, 30]),
2558 ]
2559 .into_iter()
2560 .collect::<Vec<Int96>>();
2561
2562 let stats = statistics_roundtrip::<Int96Type>(&input);
2563 assert!(!stats.is_min_max_backwards_compatible());
2564 if let Statistics::Int96(stats) = stats {
2565 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2566 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
2567 } else {
2568 panic!("expecting Statistics::Int96, got {stats:?}");
2569 }
2570 }
2571
2572 #[test]
2573 fn test_float_statistics() {
2574 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2575 assert!(stats.is_min_max_backwards_compatible());
2576 if let Statistics::Float(stats) = stats {
2577 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2578 assert_eq!(stats.max_opt().unwrap(), &3.0);
2579 } else {
2580 panic!("expecting Statistics::Float, got {stats:?}");
2581 }
2582 }
2583
2584 #[test]
2585 fn test_double_statistics() {
2586 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2587 assert!(stats.is_min_max_backwards_compatible());
2588 if let Statistics::Double(stats) = stats {
2589 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2590 assert_eq!(stats.max_opt().unwrap(), &3.0);
2591 } else {
2592 panic!("expecting Statistics::Double, got {stats:?}");
2593 }
2594 }
2595
2596 #[test]
2597 fn test_byte_array_statistics() {
2598 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2599 .iter()
2600 .map(|&s| s.into())
2601 .collect::<Vec<_>>();
2602
2603 let stats = statistics_roundtrip::<ByteArrayType>(&input);
2604 assert!(!stats.is_min_max_backwards_compatible());
2605 if let Statistics::ByteArray(stats) = stats {
2606 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2607 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2608 } else {
2609 panic!("expecting Statistics::ByteArray, got {stats:?}");
2610 }
2611 }
2612
2613 #[test]
2614 fn test_fixed_len_byte_array_statistics() {
2615 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
2616 .iter()
2617 .map(|&s| ByteArray::from(s).into())
2618 .collect::<Vec<_>>();
2619
2620 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2621 assert!(!stats.is_min_max_backwards_compatible());
2622 if let Statistics::FixedLenByteArray(stats) = stats {
2623 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
2624 assert_eq!(stats.min_opt().unwrap(), &expected_min);
2625 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
2626 assert_eq!(stats.max_opt().unwrap(), &expected_max);
2627 } else {
2628 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2629 }
2630 }
2631
2632 #[test]
2633 fn test_column_writer_check_float16_min_max() {
2634 let input = [
2635 -f16::ONE,
2636 f16::from_f32(3.0),
2637 -f16::from_f32(2.0),
2638 f16::from_f32(2.0),
2639 ]
2640 .into_iter()
2641 .map(|s| ByteArray::from(s).into())
2642 .collect::<Vec<_>>();
2643
2644 let stats = float16_statistics_roundtrip(&input);
2645 assert!(stats.is_min_max_backwards_compatible());
2646 assert_eq!(
2647 stats.min_opt().unwrap(),
2648 &ByteArray::from(-f16::from_f32(2.0))
2649 );
2650 assert_eq!(
2651 stats.max_opt().unwrap(),
2652 &ByteArray::from(f16::from_f32(3.0))
2653 );
2654 }
2655
2656 #[test]
2657 fn test_column_writer_check_float16_nan_middle() {
2658 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2659 .into_iter()
2660 .map(|s| ByteArray::from(s).into())
2661 .collect::<Vec<_>>();
2662
2663 let stats = float16_statistics_roundtrip(&input);
2664 assert!(stats.is_min_max_backwards_compatible());
2665 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2666 assert_eq!(
2667 stats.max_opt().unwrap(),
2668 &ByteArray::from(f16::ONE + f16::ONE)
2669 );
2670 }
2671
2672 #[test]
2673 fn test_float16_statistics_nan_middle() {
2674 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2675 .into_iter()
2676 .map(|s| ByteArray::from(s).into())
2677 .collect::<Vec<_>>();
2678
2679 let stats = float16_statistics_roundtrip(&input);
2680 assert!(stats.is_min_max_backwards_compatible());
2681 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2682 assert_eq!(
2683 stats.max_opt().unwrap(),
2684 &ByteArray::from(f16::ONE + f16::ONE)
2685 );
2686 }
2687
2688 #[test]
2689 fn test_float16_statistics_nan_start() {
2690 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2691 .into_iter()
2692 .map(|s| ByteArray::from(s).into())
2693 .collect::<Vec<_>>();
2694
2695 let stats = float16_statistics_roundtrip(&input);
2696 assert!(stats.is_min_max_backwards_compatible());
2697 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2698 assert_eq!(
2699 stats.max_opt().unwrap(),
2700 &ByteArray::from(f16::ONE + f16::ONE)
2701 );
2702 }
2703
2704 #[test]
2705 fn test_float16_statistics_nan_only() {
2706 let input = [f16::NAN, f16::NAN]
2707 .into_iter()
2708 .map(|s| ByteArray::from(s).into())
2709 .collect::<Vec<_>>();
2710
2711 let stats = float16_statistics_roundtrip(&input);
2712 assert!(stats.min_bytes_opt().is_none());
2713 assert!(stats.max_bytes_opt().is_none());
2714 assert!(stats.is_min_max_backwards_compatible());
2715 }
2716
2717 #[test]
2718 fn test_float16_statistics_zero_only() {
2719 let input = [f16::ZERO]
2720 .into_iter()
2721 .map(|s| ByteArray::from(s).into())
2722 .collect::<Vec<_>>();
2723
2724 let stats = float16_statistics_roundtrip(&input);
2725 assert!(stats.is_min_max_backwards_compatible());
2726 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2727 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2728 }
2729
2730 #[test]
2731 fn test_float16_statistics_neg_zero_only() {
2732 let input = [f16::NEG_ZERO]
2733 .into_iter()
2734 .map(|s| ByteArray::from(s).into())
2735 .collect::<Vec<_>>();
2736
2737 let stats = float16_statistics_roundtrip(&input);
2738 assert!(stats.is_min_max_backwards_compatible());
2739 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2740 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2741 }
2742
2743 #[test]
2744 fn test_float16_statistics_zero_min() {
2745 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2746 .into_iter()
2747 .map(|s| ByteArray::from(s).into())
2748 .collect::<Vec<_>>();
2749
2750 let stats = float16_statistics_roundtrip(&input);
2751 assert!(stats.is_min_max_backwards_compatible());
2752 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2753 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2754 }
2755
2756 #[test]
2757 fn test_float16_statistics_neg_zero_max() {
2758 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2759 .into_iter()
2760 .map(|s| ByteArray::from(s).into())
2761 .collect::<Vec<_>>();
2762
2763 let stats = float16_statistics_roundtrip(&input);
2764 assert!(stats.is_min_max_backwards_compatible());
2765 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2766 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2767 }
2768
2769 #[test]
2770 fn test_float_statistics_nan_middle() {
2771 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2772 assert!(stats.is_min_max_backwards_compatible());
2773 if let Statistics::Float(stats) = stats {
2774 assert_eq!(stats.min_opt().unwrap(), &1.0);
2775 assert_eq!(stats.max_opt().unwrap(), &2.0);
2776 } else {
2777 panic!("expecting Statistics::Float");
2778 }
2779 }
2780
2781 #[test]
2782 fn test_float_statistics_nan_start() {
2783 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2784 assert!(stats.is_min_max_backwards_compatible());
2785 if let Statistics::Float(stats) = stats {
2786 assert_eq!(stats.min_opt().unwrap(), &1.0);
2787 assert_eq!(stats.max_opt().unwrap(), &2.0);
2788 } else {
2789 panic!("expecting Statistics::Float");
2790 }
2791 }
2792
2793 #[test]
2794 fn test_float_statistics_nan_only() {
2795 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2796 assert!(stats.min_bytes_opt().is_none());
2797 assert!(stats.max_bytes_opt().is_none());
2798 assert!(stats.is_min_max_backwards_compatible());
2799 assert!(matches!(stats, Statistics::Float(_)));
2800 }
2801
2802 #[test]
2803 fn test_float_statistics_zero_only() {
2804 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2805 assert!(stats.is_min_max_backwards_compatible());
2806 if let Statistics::Float(stats) = stats {
2807 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2808 assert!(stats.min_opt().unwrap().is_sign_negative());
2809 assert_eq!(stats.max_opt().unwrap(), &0.0);
2810 assert!(stats.max_opt().unwrap().is_sign_positive());
2811 } else {
2812 panic!("expecting Statistics::Float");
2813 }
2814 }
2815
2816 #[test]
2817 fn test_float_statistics_neg_zero_only() {
2818 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2819 assert!(stats.is_min_max_backwards_compatible());
2820 if let Statistics::Float(stats) = stats {
2821 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2822 assert!(stats.min_opt().unwrap().is_sign_negative());
2823 assert_eq!(stats.max_opt().unwrap(), &0.0);
2824 assert!(stats.max_opt().unwrap().is_sign_positive());
2825 } else {
2826 panic!("expecting Statistics::Float");
2827 }
2828 }
2829
2830 #[test]
2831 fn test_float_statistics_zero_min() {
2832 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2833 assert!(stats.is_min_max_backwards_compatible());
2834 if let Statistics::Float(stats) = stats {
2835 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2836 assert!(stats.min_opt().unwrap().is_sign_negative());
2837 assert_eq!(stats.max_opt().unwrap(), &2.0);
2838 } else {
2839 panic!("expecting Statistics::Float");
2840 }
2841 }
2842
2843 #[test]
2844 fn test_float_statistics_neg_zero_max() {
2845 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2846 assert!(stats.is_min_max_backwards_compatible());
2847 if let Statistics::Float(stats) = stats {
2848 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2849 assert_eq!(stats.max_opt().unwrap(), &0.0);
2850 assert!(stats.max_opt().unwrap().is_sign_positive());
2851 } else {
2852 panic!("expecting Statistics::Float");
2853 }
2854 }
2855
2856 #[test]
2857 fn test_double_statistics_nan_middle() {
2858 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2859 assert!(stats.is_min_max_backwards_compatible());
2860 if let Statistics::Double(stats) = stats {
2861 assert_eq!(stats.min_opt().unwrap(), &1.0);
2862 assert_eq!(stats.max_opt().unwrap(), &2.0);
2863 } else {
2864 panic!("expecting Statistics::Double");
2865 }
2866 }
2867
2868 #[test]
2869 fn test_double_statistics_nan_start() {
2870 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2871 assert!(stats.is_min_max_backwards_compatible());
2872 if let Statistics::Double(stats) = stats {
2873 assert_eq!(stats.min_opt().unwrap(), &1.0);
2874 assert_eq!(stats.max_opt().unwrap(), &2.0);
2875 } else {
2876 panic!("expecting Statistics::Double");
2877 }
2878 }
2879
2880 #[test]
2881 fn test_double_statistics_nan_only() {
2882 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2883 assert!(stats.min_bytes_opt().is_none());
2884 assert!(stats.max_bytes_opt().is_none());
2885 assert!(matches!(stats, Statistics::Double(_)));
2886 assert!(stats.is_min_max_backwards_compatible());
2887 }
2888
2889 #[test]
2890 fn test_double_statistics_zero_only() {
2891 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2892 assert!(stats.is_min_max_backwards_compatible());
2893 if let Statistics::Double(stats) = stats {
2894 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2895 assert!(stats.min_opt().unwrap().is_sign_negative());
2896 assert_eq!(stats.max_opt().unwrap(), &0.0);
2897 assert!(stats.max_opt().unwrap().is_sign_positive());
2898 } else {
2899 panic!("expecting Statistics::Double");
2900 }
2901 }
2902
2903 #[test]
2904 fn test_double_statistics_neg_zero_only() {
2905 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2906 assert!(stats.is_min_max_backwards_compatible());
2907 if let Statistics::Double(stats) = stats {
2908 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2909 assert!(stats.min_opt().unwrap().is_sign_negative());
2910 assert_eq!(stats.max_opt().unwrap(), &0.0);
2911 assert!(stats.max_opt().unwrap().is_sign_positive());
2912 } else {
2913 panic!("expecting Statistics::Double");
2914 }
2915 }
2916
2917 #[test]
2918 fn test_double_statistics_zero_min() {
2919 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2920 assert!(stats.is_min_max_backwards_compatible());
2921 if let Statistics::Double(stats) = stats {
2922 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2923 assert!(stats.min_opt().unwrap().is_sign_negative());
2924 assert_eq!(stats.max_opt().unwrap(), &2.0);
2925 } else {
2926 panic!("expecting Statistics::Double");
2927 }
2928 }
2929
2930 #[test]
2931 fn test_double_statistics_neg_zero_max() {
2932 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2933 assert!(stats.is_min_max_backwards_compatible());
2934 if let Statistics::Double(stats) = stats {
2935 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2936 assert_eq!(stats.max_opt().unwrap(), &0.0);
2937 assert!(stats.max_opt().unwrap().is_sign_positive());
2938 } else {
2939 panic!("expecting Statistics::Double");
2940 }
2941 }
2942
2943 #[test]
2944 fn test_compare_greater_byte_array_decimals() {
2945 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2946 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2947 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2948 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2949 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2950 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2951 assert!(!compare_greater_byte_array_decimals(
2952 &[0u8, 1u8,],
2953 &[1u8, 0u8,],
2954 ),);
2955 assert!(!compare_greater_byte_array_decimals(
2956 &[255u8, 35u8, 0u8, 0u8,],
2957 &[0u8,],
2958 ),);
2959 assert!(compare_greater_byte_array_decimals(
2960 &[0u8,],
2961 &[255u8, 35u8, 0u8, 0u8,],
2962 ),);
2963 }
2964
2965 #[test]
2966 fn test_column_index_with_null_pages() {
2967 let page_writer = get_test_page_writer();
2969 let props = Default::default();
2970 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2971 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2972
2973 let r = writer.close().unwrap();
2974 assert!(r.column_index.is_some());
2975 let col_idx = r.column_index.unwrap();
2976 let col_idx = match col_idx {
2977 ColumnIndexMetaData::INT32(col_idx) => col_idx,
2978 _ => panic!("wrong stats type"),
2979 };
2980 assert!(col_idx.is_null_page(0));
2982 assert!(col_idx.min_value(0).is_none());
2984 assert!(col_idx.max_value(0).is_none());
2985 assert!(col_idx.null_count(0).is_some());
2987 assert_eq!(col_idx.null_count(0), Some(4));
2988 assert!(col_idx.repetition_level_histogram(0).is_none());
2990 assert!(col_idx.definition_level_histogram(0).is_some());
2992 assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
2993 }
2994
2995 #[test]
2996 fn test_column_offset_index_metadata() {
2997 let page_writer = get_test_page_writer();
3000 let props = Default::default();
3001 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3002 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3003 writer.flush_data_pages().unwrap();
3005 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
3007
3008 let r = writer.close().unwrap();
3009 let column_index = r.column_index.unwrap();
3010 let offset_index = r.offset_index.unwrap();
3011
3012 assert_eq!(8, r.rows_written);
3013
3014 let column_index = match column_index {
3016 ColumnIndexMetaData::INT32(column_index) => column_index,
3017 _ => panic!("wrong stats type"),
3018 };
3019 assert_eq!(2, column_index.num_pages());
3020 assert_eq!(2, offset_index.page_locations.len());
3021 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
3022 for idx in 0..2 {
3023 assert!(!column_index.is_null_page(idx));
3024 assert_eq!(0, column_index.null_count(0).unwrap());
3025 }
3026
3027 if let Some(stats) = r.metadata.statistics() {
3028 assert_eq!(stats.null_count_opt(), Some(0));
3029 assert_eq!(stats.distinct_count_opt(), None);
3030 if let Statistics::Int32(stats) = stats {
3031 assert_eq!(stats.min_opt(), column_index.min_value(1));
3035 assert_eq!(stats.max_opt(), column_index.max_value(1));
3036 } else {
3037 panic!("expecting Statistics::Int32");
3038 }
3039 } else {
3040 panic!("metadata missing statistics");
3041 }
3042
3043 assert_eq!(0, offset_index.page_locations[0].first_row_index);
3045 assert_eq!(4, offset_index.page_locations[1].first_row_index);
3046 }
3047
3048 #[test]
3050 fn test_column_offset_index_metadata_truncating() {
3051 let page_writer = get_test_page_writer();
3054 let props = WriterProperties::builder()
3055 .set_statistics_truncate_length(None) .build()
3057 .into();
3058 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3059
3060 let mut data = vec![FixedLenByteArray::default(); 3];
3061 data[0].set_data(Bytes::from(vec![97_u8; 200]));
3063 data[1].set_data(Bytes::from(vec![112_u8; 200]));
3065 data[2].set_data(Bytes::from(vec![98_u8; 200]));
3066
3067 writer.write_batch(&data, None, None).unwrap();
3068
3069 writer.flush_data_pages().unwrap();
3070
3071 let r = writer.close().unwrap();
3072 let column_index = r.column_index.unwrap();
3073 let offset_index = r.offset_index.unwrap();
3074
3075 let column_index = match column_index {
3076 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3077 _ => panic!("wrong stats type"),
3078 };
3079
3080 assert_eq!(3, r.rows_written);
3081
3082 assert_eq!(1, column_index.num_pages());
3084 assert_eq!(1, offset_index.page_locations.len());
3085 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3086 assert!(!column_index.is_null_page(0));
3087 assert_eq!(Some(0), column_index.null_count(0));
3088
3089 if let Some(stats) = r.metadata.statistics() {
3090 assert_eq!(stats.null_count_opt(), Some(0));
3091 assert_eq!(stats.distinct_count_opt(), None);
3092 if let Statistics::FixedLenByteArray(stats) = stats {
3093 let column_index_min_value = column_index.min_value(0).unwrap();
3094 let column_index_max_value = column_index.max_value(0).unwrap();
3095
3096 assert_ne!(stats.min_bytes_opt().unwrap(), column_index_min_value);
3098 assert_ne!(stats.max_bytes_opt().unwrap(), column_index_max_value);
3099
3100 assert_eq!(
3101 column_index_min_value.len(),
3102 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3103 );
3104 assert_eq!(column_index_min_value, &[97_u8; 64]);
3105 assert_eq!(
3106 column_index_max_value.len(),
3107 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3108 );
3109
3110 assert_eq!(
3112 *column_index_max_value.last().unwrap(),
3113 *column_index_max_value.first().unwrap() + 1
3114 );
3115 } else {
3116 panic!("expecting Statistics::FixedLenByteArray");
3117 }
3118 } else {
3119 panic!("metadata missing statistics");
3120 }
3121 }
3122
3123 #[test]
3124 fn test_column_offset_index_truncating_spec_example() {
3125 let page_writer = get_test_page_writer();
3128
3129 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3131 let props = Arc::new(builder.build());
3132 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3133
3134 let mut data = vec![FixedLenByteArray::default(); 1];
3135 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3137
3138 writer.write_batch(&data, None, None).unwrap();
3139
3140 writer.flush_data_pages().unwrap();
3141
3142 let r = writer.close().unwrap();
3143 let column_index = r.column_index.unwrap();
3144 let offset_index = r.offset_index.unwrap();
3145
3146 let column_index = match column_index {
3147 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3148 _ => panic!("wrong stats type"),
3149 };
3150
3151 assert_eq!(1, r.rows_written);
3152
3153 assert_eq!(1, column_index.num_pages());
3155 assert_eq!(1, offset_index.page_locations.len());
3156 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3157 assert!(!column_index.is_null_page(0));
3158 assert_eq!(Some(0), column_index.null_count(0));
3159
3160 if let Some(stats) = r.metadata.statistics() {
3161 assert_eq!(stats.null_count_opt(), Some(0));
3162 assert_eq!(stats.distinct_count_opt(), None);
3163 if let Statistics::FixedLenByteArray(_stats) = stats {
3164 let column_index_min_value = column_index.min_value(0).unwrap();
3165 let column_index_max_value = column_index.max_value(0).unwrap();
3166
3167 assert_eq!(column_index_min_value.len(), 1);
3168 assert_eq!(column_index_max_value.len(), 1);
3169
3170 assert_eq!("B".as_bytes(), column_index_min_value);
3171 assert_eq!("C".as_bytes(), column_index_max_value);
3172
3173 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3174 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3175 } else {
3176 panic!("expecting Statistics::FixedLenByteArray");
3177 }
3178 } else {
3179 panic!("metadata missing statistics");
3180 }
3181 }
3182
3183 #[test]
3184 fn test_float16_min_max_no_truncation() {
3185 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3187 let props = Arc::new(builder.build());
3188 let page_writer = get_test_page_writer();
3189 let mut writer = get_test_float16_column_writer(page_writer, props);
3190
3191 let expected_value = f16::PI.to_le_bytes().to_vec();
3192 let data = vec![ByteArray::from(expected_value.clone()).into()];
3193 writer.write_batch(&data, None, None).unwrap();
3194 writer.flush_data_pages().unwrap();
3195
3196 let r = writer.close().unwrap();
3197
3198 let column_index = r.column_index.unwrap();
3201 let column_index = match column_index {
3202 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3203 _ => panic!("wrong stats type"),
3204 };
3205 let column_index_min_bytes = column_index.min_value(0).unwrap();
3206 let column_index_max_bytes = column_index.max_value(0).unwrap();
3207 assert_eq!(expected_value, column_index_min_bytes);
3208 assert_eq!(expected_value, column_index_max_bytes);
3209
3210 let stats = r.metadata.statistics().unwrap();
3212 if let Statistics::FixedLenByteArray(stats) = stats {
3213 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3214 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3215 assert_eq!(expected_value, stats_min_bytes);
3216 assert_eq!(expected_value, stats_max_bytes);
3217 } else {
3218 panic!("expecting Statistics::FixedLenByteArray");
3219 }
3220 }
3221
3222 #[test]
3223 fn test_decimal_min_max_no_truncation() {
3224 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3226 let props = Arc::new(builder.build());
3227 let page_writer = get_test_page_writer();
3228 let mut writer =
3229 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3230
3231 let expected_value = vec![
3232 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3233 231u8, 90u8, 0u8, 0u8,
3234 ];
3235 let data = vec![ByteArray::from(expected_value.clone()).into()];
3236 writer.write_batch(&data, None, None).unwrap();
3237 writer.flush_data_pages().unwrap();
3238
3239 let r = writer.close().unwrap();
3240
3241 let column_index = r.column_index.unwrap();
3244 let column_index = match column_index {
3245 ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
3246 _ => panic!("wrong stats type"),
3247 };
3248 let column_index_min_bytes = column_index.min_value(0).unwrap();
3249 let column_index_max_bytes = column_index.max_value(0).unwrap();
3250 assert_eq!(expected_value, column_index_min_bytes);
3251 assert_eq!(expected_value, column_index_max_bytes);
3252
3253 let stats = r.metadata.statistics().unwrap();
3255 if let Statistics::FixedLenByteArray(stats) = stats {
3256 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3257 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3258 assert_eq!(expected_value, stats_min_bytes);
3259 assert_eq!(expected_value, stats_max_bytes);
3260 } else {
3261 panic!("expecting Statistics::FixedLenByteArray");
3262 }
3263 }
3264
3265 #[test]
3266 fn test_statistics_truncating_byte_array_default() {
3267 let page_writer = get_test_page_writer();
3268
3269 let props = WriterProperties::builder().build().into();
3271 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3272
3273 let mut data = vec![ByteArray::default(); 1];
3274 data[0].set_data(Bytes::from(String::from(
3275 "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3276 )));
3277 writer.write_batch(&data, None, None).unwrap();
3278 writer.flush_data_pages().unwrap();
3279
3280 let r = writer.close().unwrap();
3281
3282 assert_eq!(1, r.rows_written);
3283
3284 let stats = r.metadata.statistics().expect("statistics");
3285 if let Statistics::ByteArray(_stats) = stats {
3286 let min_value = _stats.min_opt().unwrap();
3287 let max_value = _stats.max_opt().unwrap();
3288
3289 assert!(!_stats.min_is_exact());
3290 assert!(!_stats.max_is_exact());
3291
3292 let expected_len = 64;
3293 assert_eq!(min_value.len(), expected_len);
3294 assert_eq!(max_value.len(), expected_len);
3295
3296 let expected_min =
3297 "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3298 assert_eq!(expected_min, min_value.as_bytes());
3299 let expected_max =
3301 "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3302 assert_eq!(expected_max, max_value.as_bytes());
3303 } else {
3304 panic!("expecting Statistics::ByteArray");
3305 }
3306 }
3307
3308 #[test]
3309 fn test_statistics_truncating_byte_array() {
3310 let page_writer = get_test_page_writer();
3311
3312 const TEST_TRUNCATE_LENGTH: usize = 1;
3313
3314 let builder =
3316 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3317 let props = Arc::new(builder.build());
3318 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3319
3320 let mut data = vec![ByteArray::default(); 1];
3321 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3323
3324 writer.write_batch(&data, None, None).unwrap();
3325
3326 writer.flush_data_pages().unwrap();
3327
3328 let r = writer.close().unwrap();
3329
3330 assert_eq!(1, r.rows_written);
3331
3332 let stats = r.metadata.statistics().expect("statistics");
3333 assert_eq!(stats.null_count_opt(), Some(0));
3334 assert_eq!(stats.distinct_count_opt(), None);
3335 if let Statistics::ByteArray(_stats) = stats {
3336 let min_value = _stats.min_opt().unwrap();
3337 let max_value = _stats.max_opt().unwrap();
3338
3339 assert!(!_stats.min_is_exact());
3340 assert!(!_stats.max_is_exact());
3341
3342 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3343 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3344
3345 assert_eq!("B".as_bytes(), min_value.as_bytes());
3346 assert_eq!("C".as_bytes(), max_value.as_bytes());
3347 } else {
3348 panic!("expecting Statistics::ByteArray");
3349 }
3350 }
3351
3352 #[test]
3353 fn test_statistics_truncating_fixed_len_byte_array() {
3354 let page_writer = get_test_page_writer();
3355
3356 const TEST_TRUNCATE_LENGTH: usize = 1;
3357
3358 let builder =
3360 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3361 let props = Arc::new(builder.build());
3362 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3363
3364 let mut data = vec![FixedLenByteArray::default(); 1];
3365
3366 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3367 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3368
3369 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3371 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3372
3373 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3375
3376 writer.write_batch(&data, None, None).unwrap();
3377
3378 writer.flush_data_pages().unwrap();
3379
3380 let r = writer.close().unwrap();
3381
3382 assert_eq!(1, r.rows_written);
3383
3384 let stats = r.metadata.statistics().expect("statistics");
3385 assert_eq!(stats.null_count_opt(), Some(0));
3386 assert_eq!(stats.distinct_count_opt(), None);
3387 if let Statistics::FixedLenByteArray(_stats) = stats {
3388 let min_value = _stats.min_opt().unwrap();
3389 let max_value = _stats.max_opt().unwrap();
3390
3391 assert!(!_stats.min_is_exact());
3392 assert!(!_stats.max_is_exact());
3393
3394 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3395 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3396
3397 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3398 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3399
3400 let reconstructed_min = i128::from_be_bytes([
3401 min_value.as_bytes()[0],
3402 0,
3403 0,
3404 0,
3405 0,
3406 0,
3407 0,
3408 0,
3409 0,
3410 0,
3411 0,
3412 0,
3413 0,
3414 0,
3415 0,
3416 0,
3417 ]);
3418
3419 let reconstructed_max = i128::from_be_bytes([
3420 max_value.as_bytes()[0],
3421 0,
3422 0,
3423 0,
3424 0,
3425 0,
3426 0,
3427 0,
3428 0,
3429 0,
3430 0,
3431 0,
3432 0,
3433 0,
3434 0,
3435 0,
3436 ]);
3437
3438 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3440 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3441 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3442 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3443 } else {
3444 panic!("expecting Statistics::FixedLenByteArray");
3445 }
3446 }
3447
3448 #[test]
3449 fn test_send() {
3450 fn test<T: Send>() {}
3451 test::<ColumnWriterImpl<Int32Type>>();
3452 }
3453
3454 #[test]
3455 fn test_increment() {
3456 let v = increment(vec![0, 0, 0]).unwrap();
3457 assert_eq!(&v, &[0, 0, 1]);
3458
3459 let v = increment(vec![0, 255, 255]).unwrap();
3461 assert_eq!(&v, &[1, 0, 0]);
3462
3463 let v = increment(vec![255, 255, 255]);
3465 assert!(v.is_none());
3466 }
3467
3468 #[test]
3469 fn test_increment_utf8() {
3470 let test_inc = |o: &str, expected: &str| {
3471 if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3472 assert_eq!(v, expected);
3474 assert!(*v > *o);
3476 let mut greater = ByteArray::new();
3478 greater.set_data(Bytes::from(v));
3479 let mut original = ByteArray::new();
3480 original.set_data(Bytes::from(o.as_bytes().to_vec()));
3481 assert!(greater > original);
3482 } else {
3483 panic!("Expected incremented UTF8 string to also be valid.");
3484 }
3485 };
3486
3487 test_inc("hello", "hellp");
3489
3490 test_inc("a\u{7f}", "b");
3492
3493 assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3495
3496 test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3498
3499 test_inc("éééé", "éééê");
3501
3502 test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3504
3505 test_inc("a\u{7ff}", "b");
3507
3508 assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3510
3511 test_inc("ࠀࠀ", "ࠀࠁ");
3514
3515 test_inc("a\u{ffff}", "b");
3517
3518 assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3520
3521 test_inc("𐀀𐀀", "𐀀𐀁");
3523
3524 test_inc("a\u{10ffff}", "b");
3526
3527 assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3529
3530 test_inc("a\u{D7FF}", "b");
3533 }
3534
3535 #[test]
3536 fn test_truncate_utf8() {
3537 let data = "❤️🧡💛💚💙💜";
3539 let r = truncate_utf8(data, data.len()).unwrap();
3540 assert_eq!(r.len(), data.len());
3541 assert_eq!(&r, data.as_bytes());
3542
3543 let r = truncate_utf8(data, 13).unwrap();
3545 assert_eq!(r.len(), 10);
3546 assert_eq!(&r, "❤️🧡".as_bytes());
3547
3548 let r = truncate_utf8("\u{0836}", 1);
3550 assert!(r.is_none());
3551
3552 let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3555 assert_eq!(&r, "yyyyyyyz".as_bytes());
3556
3557 let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3559 assert_eq!(&r, "ééê".as_bytes());
3560
3561 let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3563 assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3564
3565 let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3567 assert!(r.is_none());
3568
3569 let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3572 assert_eq!(&r, "ࠀࠁ".as_bytes());
3573
3574 let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3576 assert!(r.is_none());
3577
3578 let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3580 assert_eq!(&r, "𐀀𐀁".as_bytes());
3581
3582 let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3584 assert!(r.is_none());
3585 }
3586
3587 #[test]
3588 fn test_byte_array_truncate_invalid_utf8_statistics() {
3591 let message_type = "
3592 message test_schema {
3593 OPTIONAL BYTE_ARRAY a (UTF8);
3594 }
3595 ";
3596 let schema = Arc::new(parse_message_type(message_type).unwrap());
3597
3598 let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3600 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3601 let file: File = tempfile::tempfile().unwrap();
3602 let props = Arc::new(
3603 WriterProperties::builder()
3604 .set_statistics_enabled(EnabledStatistics::Chunk)
3605 .set_statistics_truncate_length(Some(8))
3606 .build(),
3607 );
3608
3609 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3610 let mut row_group_writer = writer.next_row_group().unwrap();
3611
3612 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3613 col_writer
3614 .typed::<ByteArrayType>()
3615 .write_batch(&data, Some(&def_levels), None)
3616 .unwrap();
3617 col_writer.close().unwrap();
3618 row_group_writer.close().unwrap();
3619 let file_metadata = writer.close().unwrap();
3620 let stats = file_metadata.row_group(0).column(0).statistics().unwrap();
3621 assert!(!stats.max_is_exact());
3622 assert_eq!(
3625 stats.max_bytes_opt().map(|v| v.to_vec()),
3626 Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3627 );
3628 }
3629
3630 #[test]
3631 fn test_increment_max_binary_chars() {
3632 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3633 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3634
3635 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3636 assert!(incremented.is_none())
3637 }
3638
3639 #[test]
3640 fn test_no_column_index_when_stats_disabled() {
3641 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3645 let props = Arc::new(
3646 WriterProperties::builder()
3647 .set_statistics_enabled(EnabledStatistics::None)
3648 .build(),
3649 );
3650 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3651 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3652
3653 let data = Vec::new();
3654 let def_levels = vec![0; 10];
3655 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3656 writer.flush_data_pages().unwrap();
3657
3658 let column_close_result = writer.close().unwrap();
3659 assert!(column_close_result.offset_index.is_some());
3660 assert!(column_close_result.column_index.is_none());
3661 }
3662
3663 #[test]
3664 fn test_no_offset_index_when_disabled() {
3665 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3667 let props = Arc::new(
3668 WriterProperties::builder()
3669 .set_statistics_enabled(EnabledStatistics::None)
3670 .set_offset_index_disabled(true)
3671 .build(),
3672 );
3673 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3674 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3675
3676 let data = Vec::new();
3677 let def_levels = vec![0; 10];
3678 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3679 writer.flush_data_pages().unwrap();
3680
3681 let column_close_result = writer.close().unwrap();
3682 assert!(column_close_result.offset_index.is_none());
3683 assert!(column_close_result.column_index.is_none());
3684 }
3685
3686 #[test]
3687 fn test_offset_index_overridden() {
3688 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3690 let props = Arc::new(
3691 WriterProperties::builder()
3692 .set_statistics_enabled(EnabledStatistics::Page)
3693 .set_offset_index_disabled(true)
3694 .build(),
3695 );
3696 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3697 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3698
3699 let data = Vec::new();
3700 let def_levels = vec![0; 10];
3701 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3702 writer.flush_data_pages().unwrap();
3703
3704 let column_close_result = writer.close().unwrap();
3705 assert!(column_close_result.offset_index.is_some());
3706 assert!(column_close_result.column_index.is_some());
3707 }
3708
3709 #[test]
3710 fn test_boundary_order() -> Result<()> {
3711 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3712 let column_close_result = write_multiple_pages::<Int32Type>(
3714 &descr,
3715 &[
3716 &[Some(-10), Some(10)],
3717 &[Some(-5), Some(11)],
3718 &[None],
3719 &[Some(-5), Some(11)],
3720 ],
3721 )?;
3722 let boundary_order = column_close_result
3723 .column_index
3724 .unwrap()
3725 .get_boundary_order();
3726 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3727
3728 let column_close_result = write_multiple_pages::<Int32Type>(
3730 &descr,
3731 &[
3732 &[Some(10), Some(11)],
3733 &[Some(5), Some(11)],
3734 &[None],
3735 &[Some(-5), Some(0)],
3736 ],
3737 )?;
3738 let boundary_order = column_close_result
3739 .column_index
3740 .unwrap()
3741 .get_boundary_order();
3742 assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3743
3744 let column_close_result = write_multiple_pages::<Int32Type>(
3746 &descr,
3747 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3748 )?;
3749 let boundary_order = column_close_result
3750 .column_index
3751 .unwrap()
3752 .get_boundary_order();
3753 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3754
3755 let column_close_result =
3757 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3758 let boundary_order = column_close_result
3759 .column_index
3760 .unwrap()
3761 .get_boundary_order();
3762 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3763
3764 let column_close_result =
3766 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3767 let boundary_order = column_close_result
3768 .column_index
3769 .unwrap()
3770 .get_boundary_order();
3771 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3772
3773 let column_close_result =
3775 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3776 let boundary_order = column_close_result
3777 .column_index
3778 .unwrap()
3779 .get_boundary_order();
3780 assert_eq!(boundary_order, Some(BoundaryOrder::ASCENDING));
3781
3782 let column_close_result = write_multiple_pages::<Int32Type>(
3784 &descr,
3785 &[
3786 &[Some(10), Some(11)],
3787 &[Some(11), Some(16)],
3788 &[None],
3789 &[Some(-5), Some(0)],
3790 ],
3791 )?;
3792 let boundary_order = column_close_result
3793 .column_index
3794 .unwrap()
3795 .get_boundary_order();
3796 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3797
3798 let column_close_result = write_multiple_pages::<Int32Type>(
3800 &descr,
3801 &[
3802 &[Some(1), Some(9)],
3803 &[Some(2), Some(8)],
3804 &[None],
3805 &[Some(3), Some(7)],
3806 ],
3807 )?;
3808 let boundary_order = column_close_result
3809 .column_index
3810 .unwrap()
3811 .get_boundary_order();
3812 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3813
3814 Ok(())
3815 }
3816
3817 #[test]
3818 fn test_boundary_order_logical_type() -> Result<()> {
3819 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3822 let fba_descr = {
3823 let tpe = SchemaType::primitive_type_builder(
3824 "col",
3825 FixedLenByteArrayType::get_physical_type(),
3826 )
3827 .with_length(2)
3828 .build()?;
3829 Arc::new(ColumnDescriptor::new(
3830 Arc::new(tpe),
3831 1,
3832 0,
3833 ColumnPath::from("col"),
3834 ))
3835 };
3836
3837 let values: &[&[Option<FixedLenByteArray>]] = &[
3838 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3839 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3840 &[Some(FixedLenByteArray::from(ByteArray::from(
3841 f16::NEG_ZERO,
3842 )))],
3843 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3844 ];
3845
3846 let column_close_result =
3848 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3849 let boundary_order = column_close_result
3850 .column_index
3851 .unwrap()
3852 .get_boundary_order();
3853 assert_eq!(boundary_order, Some(BoundaryOrder::DESCENDING));
3854
3855 let column_close_result =
3857 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3858 let boundary_order = column_close_result
3859 .column_index
3860 .unwrap()
3861 .get_boundary_order();
3862 assert_eq!(boundary_order, Some(BoundaryOrder::UNORDERED));
3863
3864 Ok(())
3865 }
3866
3867 #[test]
3868 fn test_interval_stats_should_not_have_min_max() {
3869 let input = [
3870 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3871 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3872 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3873 ]
3874 .into_iter()
3875 .map(|s| ByteArray::from(s).into())
3876 .collect::<Vec<_>>();
3877
3878 let page_writer = get_test_page_writer();
3879 let mut writer = get_test_interval_column_writer(page_writer);
3880 writer.write_batch(&input, None, None).unwrap();
3881
3882 let metadata = writer.close().unwrap().metadata;
3883 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3884 stats.clone()
3885 } else {
3886 panic!("metadata missing statistics");
3887 };
3888 assert!(stats.min_bytes_opt().is_none());
3889 assert!(stats.max_bytes_opt().is_none());
3890 }
3891
3892 #[test]
3893 #[cfg(feature = "arrow")]
3894 fn test_column_writer_get_estimated_total_bytes() {
3895 let page_writer = get_test_page_writer();
3896 let props = Default::default();
3897 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3898 assert_eq!(writer.get_estimated_total_bytes(), 0);
3899
3900 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3901 writer.add_data_page().unwrap();
3902 let size_with_one_page = writer.get_estimated_total_bytes();
3903 assert_eq!(size_with_one_page, 20);
3904
3905 writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3906 writer.add_data_page().unwrap();
3907 let size_with_two_pages = writer.get_estimated_total_bytes();
3908 assert_eq!(size_with_two_pages, 20 + 21);
3910 }
3911
3912 fn write_multiple_pages<T: DataType>(
3913 column_descr: &Arc<ColumnDescriptor>,
3914 pages: &[&[Option<T::T>]],
3915 ) -> Result<ColumnCloseResult> {
3916 let column_writer = get_column_writer(
3917 column_descr.clone(),
3918 Default::default(),
3919 get_test_page_writer(),
3920 );
3921 let mut writer = get_typed_column_writer::<T>(column_writer);
3922
3923 for &page in pages {
3924 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3925 let def_levels = page
3926 .iter()
3927 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3928 .collect::<Vec<_>>();
3929 writer.write_batch(&values, Some(&def_levels), None)?;
3930 writer.flush_data_pages()?;
3931 }
3932
3933 writer.close()
3934 }
3935
3936 fn column_roundtrip_random<T: DataType>(
3940 props: WriterProperties,
3941 max_size: usize,
3942 min_value: T::T,
3943 max_value: T::T,
3944 max_def_level: i16,
3945 max_rep_level: i16,
3946 ) where
3947 T::T: PartialOrd + SampleUniform + Copy,
3948 {
3949 let mut num_values: usize = 0;
3950
3951 let mut buf: Vec<i16> = Vec::new();
3952 let def_levels = if max_def_level > 0 {
3953 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3954 for &dl in &buf[..] {
3955 if dl == max_def_level {
3956 num_values += 1;
3957 }
3958 }
3959 Some(&buf[..])
3960 } else {
3961 num_values = max_size;
3962 None
3963 };
3964
3965 let mut buf: Vec<i16> = Vec::new();
3966 let rep_levels = if max_rep_level > 0 {
3967 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3968 buf[0] = 0; Some(&buf[..])
3970 } else {
3971 None
3972 };
3973
3974 let mut values: Vec<T::T> = Vec::new();
3975 random_numbers_range(num_values, min_value, max_value, &mut values);
3976
3977 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3978 }
3979
3980 fn column_roundtrip<T: DataType>(
3982 props: WriterProperties,
3983 values: &[T::T],
3984 def_levels: Option<&[i16]>,
3985 rep_levels: Option<&[i16]>,
3986 ) {
3987 let mut file = tempfile::tempfile().unwrap();
3988 let mut write = TrackedWrite::new(&mut file);
3989 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3990
3991 let max_def_level = match def_levels {
3992 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3993 None => 0i16,
3994 };
3995
3996 let max_rep_level = match rep_levels {
3997 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3998 None => 0i16,
3999 };
4000
4001 let mut max_batch_size = values.len();
4002 if let Some(levels) = def_levels {
4003 max_batch_size = max_batch_size.max(levels.len());
4004 }
4005 if let Some(levels) = rep_levels {
4006 max_batch_size = max_batch_size.max(levels.len());
4007 }
4008
4009 let mut writer =
4010 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
4011
4012 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
4013 assert_eq!(values_written, values.len());
4014 let result = writer.close().unwrap();
4015
4016 drop(write);
4017
4018 let props = ReaderProperties::builder()
4019 .set_backward_compatible_lz4(false)
4020 .build();
4021 let page_reader = Box::new(
4022 SerializedPageReader::new_with_properties(
4023 Arc::new(file),
4024 &result.metadata,
4025 result.rows_written as usize,
4026 None,
4027 Arc::new(props),
4028 )
4029 .unwrap(),
4030 );
4031 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
4032
4033 let mut actual_values = Vec::with_capacity(max_batch_size);
4034 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
4035 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
4036
4037 let (_, values_read, levels_read) = reader
4038 .read_records(
4039 max_batch_size,
4040 actual_def_levels.as_mut(),
4041 actual_rep_levels.as_mut(),
4042 &mut actual_values,
4043 )
4044 .unwrap();
4045
4046 assert_eq!(&actual_values[..values_read], values);
4049 match actual_def_levels {
4050 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
4051 None => assert_eq!(None, def_levels),
4052 }
4053 match actual_rep_levels {
4054 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
4055 None => assert_eq!(None, rep_levels),
4056 }
4057
4058 if let Some(levels) = actual_rep_levels {
4061 let mut actual_rows_written = 0;
4062 for l in levels {
4063 if l == 0 {
4064 actual_rows_written += 1;
4065 }
4066 }
4067 assert_eq!(actual_rows_written, result.rows_written);
4068 } else if actual_def_levels.is_some() {
4069 assert_eq!(levels_read as u64, result.rows_written);
4070 } else {
4071 assert_eq!(values_read as u64, result.rows_written);
4072 }
4073 }
4074
4075 fn column_write_and_get_metadata<T: DataType>(
4078 props: WriterProperties,
4079 values: &[T::T],
4080 ) -> ColumnChunkMetaData {
4081 let page_writer = get_test_page_writer();
4082 let props = Arc::new(props);
4083 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4084 writer.write_batch(values, None, None).unwrap();
4085 writer.close().unwrap().metadata
4086 }
4087
4088 fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4090 PageEncodingStats {
4091 page_type,
4092 encoding,
4093 count,
4094 }
4095 }
4096
4097 fn check_encoding_write_support<T: DataType>(
4101 version: WriterVersion,
4102 dict_enabled: bool,
4103 data: &[T::T],
4104 dictionary_page_offset: Option<i64>,
4105 encodings: &[Encoding],
4106 page_encoding_stats: &[PageEncodingStats],
4107 ) {
4108 let props = WriterProperties::builder()
4109 .set_writer_version(version)
4110 .set_dictionary_enabled(dict_enabled)
4111 .build();
4112 let meta = column_write_and_get_metadata::<T>(props, data);
4113 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4114 assert_eq!(meta.encodings().collect::<Vec<_>>(), encodings);
4115 assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4116 }
4117
4118 fn get_test_column_writer<'a, T: DataType>(
4120 page_writer: Box<dyn PageWriter + 'a>,
4121 max_def_level: i16,
4122 max_rep_level: i16,
4123 props: WriterPropertiesPtr,
4124 ) -> ColumnWriterImpl<'a, T> {
4125 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4126 let column_writer = get_column_writer(descr, props, page_writer);
4127 get_typed_column_writer::<T>(column_writer)
4128 }
4129
4130 fn get_test_column_writer_with_path<'a, T: DataType>(
4131 page_writer: Box<dyn PageWriter + 'a>,
4132 max_def_level: i16,
4133 max_rep_level: i16,
4134 props: WriterPropertiesPtr,
4135 path: ColumnPath,
4136 ) -> ColumnWriterImpl<'a, T> {
4137 let descr = Arc::new(get_test_column_descr_with_path::<T>(
4138 max_def_level,
4139 max_rep_level,
4140 path,
4141 ));
4142 let column_writer = get_column_writer(descr, props, page_writer);
4143 get_typed_column_writer::<T>(column_writer)
4144 }
4145
4146 fn get_test_column_reader<T: DataType>(
4148 page_reader: Box<dyn PageReader>,
4149 max_def_level: i16,
4150 max_rep_level: i16,
4151 ) -> ColumnReaderImpl<T> {
4152 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4153 let column_reader = get_column_reader(descr, page_reader);
4154 get_typed_column_reader::<T>(column_reader)
4155 }
4156
4157 fn get_test_column_descr<T: DataType>(
4159 max_def_level: i16,
4160 max_rep_level: i16,
4161 ) -> ColumnDescriptor {
4162 let path = ColumnPath::from("col");
4163 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4164 .with_length(1)
4167 .build()
4168 .unwrap();
4169 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4170 }
4171
4172 fn get_test_column_descr_with_path<T: DataType>(
4173 max_def_level: i16,
4174 max_rep_level: i16,
4175 path: ColumnPath,
4176 ) -> ColumnDescriptor {
4177 let name = path.string();
4178 let tpe = SchemaType::primitive_type_builder(&name, T::get_physical_type())
4179 .with_length(1)
4182 .build()
4183 .unwrap();
4184 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4185 }
4186
4187 fn write_and_collect_page_values(
4188 path: ColumnPath,
4189 props: WriterPropertiesPtr,
4190 data: &[i32],
4191 ) -> Vec<u32> {
4192 let mut file = tempfile::tempfile().unwrap();
4193 let mut write = TrackedWrite::new(&mut file);
4194 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
4195 let mut writer =
4196 get_test_column_writer_with_path::<Int32Type>(page_writer, 0, 0, props, path);
4197 writer.write_batch(data, None, None).unwrap();
4198 let r = writer.close().unwrap();
4199
4200 drop(write);
4201
4202 let props = ReaderProperties::builder()
4203 .set_backward_compatible_lz4(false)
4204 .build();
4205 let mut page_reader = Box::new(
4206 SerializedPageReader::new_with_properties(
4207 Arc::new(file),
4208 &r.metadata,
4209 r.rows_written as usize,
4210 None,
4211 Arc::new(props),
4212 )
4213 .unwrap(),
4214 );
4215
4216 let mut values_per_page = Vec::new();
4217 while let Some(page) = page_reader.get_next_page().unwrap() {
4218 assert_eq!(page.page_type(), PageType::DATA_PAGE);
4219 values_per_page.push(page.num_values());
4220 }
4221
4222 values_per_page
4223 }
4224
4225 fn get_test_page_writer() -> Box<dyn PageWriter> {
4227 Box::new(TestPageWriter {})
4228 }
4229
4230 struct TestPageWriter {}
4231
4232 impl PageWriter for TestPageWriter {
4233 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4234 let mut res = PageWriteSpec::new();
4235 res.page_type = page.page_type();
4236 res.uncompressed_size = page.uncompressed_size();
4237 res.compressed_size = page.compressed_size();
4238 res.num_values = page.num_values();
4239 res.offset = 0;
4240 res.bytes_written = page.data().len() as u64;
4241 Ok(res)
4242 }
4243
4244 fn close(&mut self) -> Result<()> {
4245 Ok(())
4246 }
4247 }
4248
4249 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4251 let page_writer = get_test_page_writer();
4252 let props = Default::default();
4253 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4254 writer.write_batch(values, None, None).unwrap();
4255
4256 let metadata = writer.close().unwrap().metadata;
4257 if let Some(stats) = metadata.statistics() {
4258 stats.clone()
4259 } else {
4260 panic!("metadata missing statistics");
4261 }
4262 }
4263
4264 fn get_test_decimals_column_writer<T: DataType>(
4266 page_writer: Box<dyn PageWriter>,
4267 max_def_level: i16,
4268 max_rep_level: i16,
4269 props: WriterPropertiesPtr,
4270 ) -> ColumnWriterImpl<'static, T> {
4271 let descr = Arc::new(get_test_decimals_column_descr::<T>(
4272 max_def_level,
4273 max_rep_level,
4274 ));
4275 let column_writer = get_column_writer(descr, props, page_writer);
4276 get_typed_column_writer::<T>(column_writer)
4277 }
4278
4279 fn get_test_decimals_column_descr<T: DataType>(
4281 max_def_level: i16,
4282 max_rep_level: i16,
4283 ) -> ColumnDescriptor {
4284 let path = ColumnPath::from("col");
4285 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4286 .with_length(16)
4287 .with_logical_type(Some(LogicalType::Decimal {
4288 scale: 2,
4289 precision: 3,
4290 }))
4291 .with_scale(2)
4292 .with_precision(3)
4293 .build()
4294 .unwrap();
4295 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4296 }
4297
4298 fn float16_statistics_roundtrip(
4299 values: &[FixedLenByteArray],
4300 ) -> ValueStatistics<FixedLenByteArray> {
4301 let page_writer = get_test_page_writer();
4302 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4303 writer.write_batch(values, None, None).unwrap();
4304
4305 let metadata = writer.close().unwrap().metadata;
4306 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4307 stats.clone()
4308 } else {
4309 panic!("metadata missing statistics");
4310 }
4311 }
4312
4313 fn get_test_float16_column_writer(
4314 page_writer: Box<dyn PageWriter>,
4315 props: WriterPropertiesPtr,
4316 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4317 let descr = Arc::new(get_test_float16_column_descr(0, 0));
4318 let column_writer = get_column_writer(descr, props, page_writer);
4319 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4320 }
4321
4322 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4323 let path = ColumnPath::from("col");
4324 let tpe =
4325 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4326 .with_length(2)
4327 .with_logical_type(Some(LogicalType::Float16))
4328 .build()
4329 .unwrap();
4330 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4331 }
4332
4333 fn get_test_interval_column_writer(
4334 page_writer: Box<dyn PageWriter>,
4335 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4336 let descr = Arc::new(get_test_interval_column_descr());
4337 let column_writer = get_column_writer(descr, Default::default(), page_writer);
4338 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4339 }
4340
4341 fn get_test_interval_column_descr() -> ColumnDescriptor {
4342 let path = ColumnPath::from("col");
4343 let tpe =
4344 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4345 .with_length(12)
4346 .with_converted_type(ConvertedType::INTERVAL)
4347 .build()
4348 .unwrap();
4349 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4350 }
4351
4352 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4354 page_writer: Box<dyn PageWriter + 'a>,
4355 max_def_level: i16,
4356 max_rep_level: i16,
4357 props: WriterPropertiesPtr,
4358 ) -> ColumnWriterImpl<'a, T> {
4359 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4360 max_def_level,
4361 max_rep_level,
4362 ));
4363 let column_writer = get_column_writer(descr, props, page_writer);
4364 get_typed_column_writer::<T>(column_writer)
4365 }
4366
4367 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4369 max_def_level: i16,
4370 max_rep_level: i16,
4371 ) -> ColumnDescriptor {
4372 let path = ColumnPath::from("col");
4373 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4374 .with_converted_type(ConvertedType::UINT_32)
4375 .build()
4376 .unwrap();
4377 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4378 }
4379
4380 #[test]
4381 fn test_page_v2_snappy_compression_fallback() {
4382 let page_writer = TestPageWriter {};
4384
4385 let props = WriterProperties::builder()
4387 .set_writer_version(WriterVersion::PARQUET_2_0)
4388 .set_dictionary_enabled(false)
4390 .set_compression(Compression::SNAPPY)
4391 .build();
4392
4393 let mut column_writer =
4394 get_test_column_writer::<ByteArrayType>(Box::new(page_writer), 0, 0, Arc::new(props));
4395
4396 let values = vec![ByteArray::from("a")];
4399
4400 column_writer.write_batch(&values, None, None).unwrap();
4401
4402 let result = column_writer.close().unwrap();
4403 assert_eq!(
4404 result.metadata.uncompressed_size(),
4405 result.metadata.compressed_size()
4406 );
4407 }
4408}