1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35#[cfg(feature = "encryption")]
36use crate::encryption::encrypt::get_column_crypto_metadata;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{
39 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
40 OffsetIndexBuilder,
41};
42use crate::file::page_encoding_stats::PageEncodingStats;
43use crate::file::properties::{
44 EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
45};
46use crate::file::statistics::{Statistics, ValueStatistics};
47use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
48
49pub(crate) mod encoder;
50
51macro_rules! downcast_writer {
52 ($e:expr, $i:ident, $b:expr) => {
53 match $e {
54 Self::BoolColumnWriter($i) => $b,
55 Self::Int32ColumnWriter($i) => $b,
56 Self::Int64ColumnWriter($i) => $b,
57 Self::Int96ColumnWriter($i) => $b,
58 Self::FloatColumnWriter($i) => $b,
59 Self::DoubleColumnWriter($i) => $b,
60 Self::ByteArrayColumnWriter($i) => $b,
61 Self::FixedLenByteArrayColumnWriter($i) => $b,
62 }
63 };
64}
65
66pub enum ColumnWriter<'a> {
68 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
70 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
72 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
74 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
76 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
78 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
80 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
82 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
84}
85
86impl ColumnWriter<'_> {
87 #[cfg(feature = "arrow")]
89 pub(crate) fn memory_size(&self) -> usize {
90 downcast_writer!(self, typed, typed.memory_size())
91 }
92
93 #[cfg(feature = "arrow")]
95 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
96 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
97 }
98
99 pub fn close(self) -> Result<ColumnCloseResult> {
101 downcast_writer!(self, typed, typed.close())
102 }
103}
104
105pub fn get_column_writer<'a>(
107 descr: ColumnDescPtr,
108 props: WriterPropertiesPtr,
109 page_writer: Box<dyn PageWriter + 'a>,
110) -> ColumnWriter<'a> {
111 match descr.physical_type() {
112 Type::BOOLEAN => {
113 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
114 }
115 Type::INT32 => {
116 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
117 }
118 Type::INT64 => {
119 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
120 }
121 Type::INT96 => {
122 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
123 }
124 Type::FLOAT => {
125 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
126 }
127 Type::DOUBLE => {
128 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
129 }
130 Type::BYTE_ARRAY => {
131 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
132 }
133 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
134 ColumnWriterImpl::new(descr, props, page_writer),
135 ),
136 }
137}
138
139pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
144 T::get_column_writer(col_writer).unwrap_or_else(|| {
145 panic!(
146 "Failed to convert column writer into a typed column writer for `{}` type",
147 T::get_physical_type()
148 )
149 })
150}
151
152pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
154 col_writer: &'b ColumnWriter<'a>,
155) -> &'b ColumnWriterImpl<'a, T> {
156 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
157 panic!(
158 "Failed to convert column writer into a typed column writer for `{}` type",
159 T::get_physical_type()
160 )
161 })
162}
163
164pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
166 col_writer: &'a mut ColumnWriter<'b>,
167) -> &'a mut ColumnWriterImpl<'b, T> {
168 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
169 panic!(
170 "Failed to convert column writer into a typed column writer for `{}` type",
171 T::get_physical_type()
172 )
173 })
174}
175
176#[derive(Debug, Clone)]
178pub struct ColumnCloseResult {
179 pub bytes_written: u64,
181 pub rows_written: u64,
183 pub metadata: ColumnChunkMetaData,
185 pub bloom_filter: Option<Sbbf>,
187 pub column_index: Option<ColumnIndex>,
189 pub offset_index: Option<OffsetIndex>,
191}
192
193#[derive(Default)]
195struct PageMetrics {
196 num_buffered_values: u32,
197 num_buffered_rows: u32,
198 num_page_nulls: u64,
199 repetition_level_histogram: Option<LevelHistogram>,
200 definition_level_histogram: Option<LevelHistogram>,
201}
202
203impl PageMetrics {
204 fn new() -> Self {
205 Default::default()
206 }
207
208 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
210 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
211 self
212 }
213
214 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
216 self.definition_level_histogram = LevelHistogram::try_new(max_level);
217 self
218 }
219
220 fn new_page(&mut self) {
223 self.num_buffered_values = 0;
224 self.num_buffered_rows = 0;
225 self.num_page_nulls = 0;
226 self.repetition_level_histogram
227 .as_mut()
228 .map(LevelHistogram::reset);
229 self.definition_level_histogram
230 .as_mut()
231 .map(LevelHistogram::reset);
232 }
233
234 fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
236 if let Some(ref mut rep_hist) = self.repetition_level_histogram {
237 rep_hist.update_from_levels(levels);
238 }
239 }
240
241 fn update_definition_level_histogram(&mut self, levels: &[i16]) {
243 if let Some(ref mut def_hist) = self.definition_level_histogram {
244 def_hist.update_from_levels(levels);
245 }
246 }
247}
248
249#[derive(Default)]
251struct ColumnMetrics<T: Default> {
252 total_bytes_written: u64,
253 total_rows_written: u64,
254 total_uncompressed_size: u64,
255 total_compressed_size: u64,
256 total_num_values: u64,
257 dictionary_page_offset: Option<u64>,
258 data_page_offset: Option<u64>,
259 min_column_value: Option<T>,
260 max_column_value: Option<T>,
261 num_column_nulls: u64,
262 column_distinct_count: Option<u64>,
263 variable_length_bytes: Option<i64>,
264 repetition_level_histogram: Option<LevelHistogram>,
265 definition_level_histogram: Option<LevelHistogram>,
266}
267
268impl<T: Default> ColumnMetrics<T> {
269 fn new() -> Self {
270 Default::default()
271 }
272
273 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
275 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
276 self
277 }
278
279 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
281 self.definition_level_histogram = LevelHistogram::try_new(max_level);
282 self
283 }
284
285 fn update_histogram(
287 chunk_histogram: &mut Option<LevelHistogram>,
288 page_histogram: &Option<LevelHistogram>,
289 ) {
290 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
291 chunk_hist.add(page_hist);
292 }
293 }
294
295 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
298 ColumnMetrics::<T>::update_histogram(
299 &mut self.definition_level_histogram,
300 &page_metrics.definition_level_histogram,
301 );
302 ColumnMetrics::<T>::update_histogram(
303 &mut self.repetition_level_histogram,
304 &page_metrics.repetition_level_histogram,
305 );
306 }
307
308 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
310 if let Some(var_bytes) = variable_length_bytes {
311 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
312 }
313 }
314}
315
316pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
318
319pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
321 descr: ColumnDescPtr,
323 props: WriterPropertiesPtr,
324 statistics_enabled: EnabledStatistics,
325
326 page_writer: Box<dyn PageWriter + 'a>,
327 codec: Compression,
328 compressor: Option<Box<dyn Codec>>,
329 encoder: E,
330
331 page_metrics: PageMetrics,
332 column_metrics: ColumnMetrics<E::T>,
334
335 encodings: BTreeSet<Encoding>,
338 encoding_stats: Vec<PageEncodingStats>,
339 def_levels_sink: Vec<i16>,
341 rep_levels_sink: Vec<i16>,
342 data_pages: VecDeque<CompressedPage>,
343 column_index_builder: ColumnIndexBuilder,
345 offset_index_builder: Option<OffsetIndexBuilder>,
346
347 data_page_boundary_ascending: bool,
350 data_page_boundary_descending: bool,
351 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
353}
354
355impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
356 pub fn new(
358 descr: ColumnDescPtr,
359 props: WriterPropertiesPtr,
360 page_writer: Box<dyn PageWriter + 'a>,
361 ) -> Self {
362 let codec = props.compression(descr.path());
363 let codec_options = CodecOptionsBuilder::default().build();
364 let compressor = create_codec(codec, &codec_options).unwrap();
365 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
366
367 let statistics_enabled = props.statistics_enabled(descr.path());
368
369 let mut encodings = BTreeSet::new();
370 encodings.insert(Encoding::RLE);
372
373 let mut page_metrics = PageMetrics::new();
374 let mut column_metrics = ColumnMetrics::<E::T>::new();
375
376 if statistics_enabled != EnabledStatistics::None {
378 page_metrics = page_metrics
379 .with_repetition_level_histogram(descr.max_rep_level())
380 .with_definition_level_histogram(descr.max_def_level());
381 column_metrics = column_metrics
382 .with_repetition_level_histogram(descr.max_rep_level())
383 .with_definition_level_histogram(descr.max_def_level())
384 }
385
386 let mut column_index_builder = ColumnIndexBuilder::new();
388 if statistics_enabled != EnabledStatistics::Page {
389 column_index_builder.to_invalid()
390 }
391
392 let offset_index_builder = match props.offset_index_disabled() {
394 false => Some(OffsetIndexBuilder::new()),
395 _ => None,
396 };
397
398 Self {
399 descr,
400 props,
401 statistics_enabled,
402 page_writer,
403 codec,
404 compressor,
405 encoder,
406 def_levels_sink: vec![],
407 rep_levels_sink: vec![],
408 data_pages: VecDeque::new(),
409 page_metrics,
410 column_metrics,
411 column_index_builder,
412 offset_index_builder,
413 encodings,
414 encoding_stats: vec![],
415 data_page_boundary_ascending: true,
416 data_page_boundary_descending: true,
417 last_non_null_data_page_min_max: None,
418 }
419 }
420
421 #[allow(clippy::too_many_arguments)]
422 pub(crate) fn write_batch_internal(
423 &mut self,
424 values: &E::Values,
425 value_indices: Option<&[usize]>,
426 def_levels: Option<&[i16]>,
427 rep_levels: Option<&[i16]>,
428 min: Option<&E::T>,
429 max: Option<&E::T>,
430 distinct_count: Option<u64>,
431 ) -> Result<usize> {
432 if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
434 if def.len() != rep.len() {
435 return Err(general_err!(
436 "Inconsistent length of definition and repetition levels: {} != {}",
437 def.len(),
438 rep.len()
439 ));
440 }
441 }
442
443 let num_levels = match def_levels {
454 Some(def_levels) => def_levels.len(),
455 None => values.len(),
456 };
457
458 if let Some(min) = min {
459 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
460 }
461 if let Some(max) = max {
462 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
463 }
464
465 if self.encoder.num_values() == 0 {
467 self.column_metrics.column_distinct_count = distinct_count;
468 } else {
469 self.column_metrics.column_distinct_count = None;
470 }
471
472 let mut values_offset = 0;
473 let mut levels_offset = 0;
474 let base_batch_size = self.props.write_batch_size();
475 while levels_offset < num_levels {
476 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
477
478 if let Some(r) = rep_levels {
480 while end_offset < r.len() && r[end_offset] != 0 {
481 end_offset += 1;
482 }
483 }
484
485 values_offset += self.write_mini_batch(
486 values,
487 values_offset,
488 value_indices,
489 end_offset - levels_offset,
490 def_levels.map(|lv| &lv[levels_offset..end_offset]),
491 rep_levels.map(|lv| &lv[levels_offset..end_offset]),
492 )?;
493 levels_offset = end_offset;
494 }
495
496 Ok(values_offset)
498 }
499
500 pub fn write_batch(
513 &mut self,
514 values: &E::Values,
515 def_levels: Option<&[i16]>,
516 rep_levels: Option<&[i16]>,
517 ) -> Result<usize> {
518 self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
519 }
520
521 pub fn write_batch_with_statistics(
529 &mut self,
530 values: &E::Values,
531 def_levels: Option<&[i16]>,
532 rep_levels: Option<&[i16]>,
533 min: Option<&E::T>,
534 max: Option<&E::T>,
535 distinct_count: Option<u64>,
536 ) -> Result<usize> {
537 self.write_batch_internal(
538 values,
539 None,
540 def_levels,
541 rep_levels,
542 min,
543 max,
544 distinct_count,
545 )
546 }
547
548 #[cfg(feature = "arrow")]
553 pub(crate) fn memory_size(&self) -> usize {
554 self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
555 }
556
557 pub fn get_total_bytes_written(&self) -> u64 {
563 self.column_metrics.total_bytes_written
564 }
565
566 #[cfg(feature = "arrow")]
572 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
573 self.data_pages
574 .iter()
575 .map(|page| page.data().len() as u64)
576 .sum::<u64>()
577 + self.column_metrics.total_bytes_written
578 + self.encoder.estimated_data_page_size() as u64
579 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
580 }
581
582 pub fn get_total_rows_written(&self) -> u64 {
585 self.column_metrics.total_rows_written
586 }
587
588 pub fn get_descriptor(&self) -> &ColumnDescPtr {
590 &self.descr
591 }
592
593 pub fn close(mut self) -> Result<ColumnCloseResult> {
596 if self.page_metrics.num_buffered_values > 0 {
597 self.add_data_page()?;
598 }
599 if self.encoder.has_dictionary() {
600 self.write_dictionary_page()?;
601 }
602 self.flush_data_pages()?;
603 let metadata = self.build_column_metadata()?;
604 self.page_writer.close()?;
605
606 let boundary_order = match (
607 self.data_page_boundary_ascending,
608 self.data_page_boundary_descending,
609 ) {
610 (true, _) => BoundaryOrder::ASCENDING,
613 (false, true) => BoundaryOrder::DESCENDING,
614 (false, false) => BoundaryOrder::UNORDERED,
615 };
616 self.column_index_builder.set_boundary_order(boundary_order);
617
618 let column_index = self
619 .column_index_builder
620 .valid()
621 .then(|| self.column_index_builder.build_to_thrift());
622
623 let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());
624
625 Ok(ColumnCloseResult {
626 bytes_written: self.column_metrics.total_bytes_written,
627 rows_written: self.column_metrics.total_rows_written,
628 bloom_filter: self.encoder.flush_bloom_filter(),
629 metadata,
630 column_index,
631 offset_index,
632 })
633 }
634
635 fn write_mini_batch(
639 &mut self,
640 values: &E::Values,
641 values_offset: usize,
642 value_indices: Option<&[usize]>,
643 num_levels: usize,
644 def_levels: Option<&[i16]>,
645 rep_levels: Option<&[i16]>,
646 ) -> Result<usize> {
647 let values_to_write = if self.descr.max_def_level() > 0 {
649 let levels = def_levels.ok_or_else(|| {
650 general_err!(
651 "Definition levels are required, because max definition level = {}",
652 self.descr.max_def_level()
653 )
654 })?;
655
656 let mut values_to_write = 0;
657 for &level in levels {
658 if level == self.descr.max_def_level() {
659 values_to_write += 1;
660 } else {
661 self.page_metrics.num_page_nulls += 1
663 }
664 }
665
666 self.page_metrics.update_definition_level_histogram(levels);
668
669 self.def_levels_sink.extend_from_slice(levels);
670 values_to_write
671 } else {
672 num_levels
673 };
674
675 if self.descr.max_rep_level() > 0 {
677 let levels = rep_levels.ok_or_else(|| {
679 general_err!(
680 "Repetition levels are required, because max repetition level = {}",
681 self.descr.max_rep_level()
682 )
683 })?;
684
685 if !levels.is_empty() && levels[0] != 0 {
686 return Err(general_err!(
687 "Write must start at a record boundary, got non-zero repetition level of {}",
688 levels[0]
689 ));
690 }
691
692 for &level in levels {
694 self.page_metrics.num_buffered_rows += (level == 0) as u32
695 }
696
697 self.page_metrics.update_repetition_level_histogram(levels);
699
700 self.rep_levels_sink.extend_from_slice(levels);
701 } else {
702 self.page_metrics.num_buffered_rows += num_levels as u32;
705 }
706
707 match value_indices {
708 Some(indices) => {
709 let indices = &indices[values_offset..values_offset + values_to_write];
710 self.encoder.write_gather(values, indices)?;
711 }
712 None => self.encoder.write(values, values_offset, values_to_write)?,
713 }
714
715 self.page_metrics.num_buffered_values += num_levels as u32;
716
717 if self.should_add_data_page() {
718 self.add_data_page()?;
719 }
720
721 if self.should_dict_fallback() {
722 self.dict_fallback()?;
723 }
724
725 Ok(values_to_write)
726 }
727
728 #[inline]
733 fn should_dict_fallback(&self) -> bool {
734 match self.encoder.estimated_dict_page_size() {
735 Some(size) => {
736 size >= self
737 .props
738 .column_dictionary_page_size_limit(self.descr.path())
739 }
740 None => false,
741 }
742 }
743
744 #[inline]
746 fn should_add_data_page(&self) -> bool {
747 if self.page_metrics.num_buffered_values == 0 {
752 return false;
753 }
754
755 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
756 || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
757 }
758
759 fn dict_fallback(&mut self) -> Result<()> {
762 if self.page_metrics.num_buffered_values > 0 {
764 self.add_data_page()?;
765 }
766 self.write_dictionary_page()?;
767 self.flush_data_pages()?;
768 Ok(())
769 }
770
771 fn update_column_offset_index(
773 &mut self,
774 page_statistics: Option<&ValueStatistics<E::T>>,
775 page_variable_length_bytes: Option<i64>,
776 ) {
777 let null_page =
779 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
780 if null_page && self.column_index_builder.valid() {
783 self.column_index_builder.append(
784 null_page,
785 vec![],
786 vec![],
787 self.page_metrics.num_page_nulls as i64,
788 );
789 } else if self.column_index_builder.valid() {
790 match &page_statistics {
793 None => {
794 self.column_index_builder.to_invalid();
795 }
796 Some(stat) => {
797 let new_min = stat.min_opt().unwrap();
799 let new_max = stat.max_opt().unwrap();
800 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
801 if self.data_page_boundary_ascending {
802 let not_ascending = compare_greater(&self.descr, last_min, new_min)
804 || compare_greater(&self.descr, last_max, new_max);
805 if not_ascending {
806 self.data_page_boundary_ascending = false;
807 }
808 }
809
810 if self.data_page_boundary_descending {
811 let not_descending = compare_greater(&self.descr, new_min, last_min)
813 || compare_greater(&self.descr, new_max, last_max);
814 if not_descending {
815 self.data_page_boundary_descending = false;
816 }
817 }
818 }
819 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
820
821 if self.can_truncate_value() {
822 self.column_index_builder.append(
823 null_page,
824 self.truncate_min_value(
825 self.props.column_index_truncate_length(),
826 stat.min_bytes_opt().unwrap(),
827 )
828 .0,
829 self.truncate_max_value(
830 self.props.column_index_truncate_length(),
831 stat.max_bytes_opt().unwrap(),
832 )
833 .0,
834 self.page_metrics.num_page_nulls as i64,
835 );
836 } else {
837 self.column_index_builder.append(
838 null_page,
839 stat.min_bytes_opt().unwrap().to_vec(),
840 stat.max_bytes_opt().unwrap().to_vec(),
841 self.page_metrics.num_page_nulls as i64,
842 );
843 }
844 }
845 }
846 }
847
848 self.column_index_builder.append_histograms(
850 &self.page_metrics.repetition_level_histogram,
851 &self.page_metrics.definition_level_histogram,
852 );
853
854 if let Some(builder) = self.offset_index_builder.as_mut() {
856 builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
857 builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
858 }
859 }
860
861 fn can_truncate_value(&self) -> bool {
863 match self.descr.physical_type() {
864 Type::FIXED_LEN_BYTE_ARRAY
868 if !matches!(
869 self.descr.logical_type(),
870 Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
871 ) =>
872 {
873 true
874 }
875 Type::BYTE_ARRAY => true,
876 _ => false,
878 }
879 }
880
881 fn is_utf8(&self) -> bool {
883 self.get_descriptor().logical_type() == Some(LogicalType::String)
884 || self.get_descriptor().converted_type() == ConvertedType::UTF8
885 }
886
887 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
898 truncation_length
899 .filter(|l| data.len() > *l)
900 .and_then(|l|
901 if self.is_utf8() {
903 match str::from_utf8(data) {
904 Ok(str_data) => truncate_utf8(str_data, l),
905 Err(_) => Some(data[..l].to_vec()),
906 }
907 } else {
908 Some(data[..l].to_vec())
909 }
910 )
911 .map(|truncated| (truncated, true))
912 .unwrap_or_else(|| (data.to_vec(), false))
913 }
914
915 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
929 truncation_length
930 .filter(|l| data.len() > *l)
931 .and_then(|l|
932 if self.is_utf8() {
934 match str::from_utf8(data) {
935 Ok(str_data) => truncate_and_increment_utf8(str_data, l),
936 Err(_) => increment(data[..l].to_vec()),
937 }
938 } else {
939 increment(data[..l].to_vec())
940 }
941 )
942 .map(|truncated| (truncated, true))
943 .unwrap_or_else(|| (data.to_vec(), false))
944 }
945
946 fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
949 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
950 match statistics {
951 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
952 let (min, did_truncate_min) = self.truncate_min_value(
953 self.props.statistics_truncate_length(),
954 stats.min_bytes_opt().unwrap(),
955 );
956 let (max, did_truncate_max) = self.truncate_max_value(
957 self.props.statistics_truncate_length(),
958 stats.max_bytes_opt().unwrap(),
959 );
960 Statistics::ByteArray(
961 ValueStatistics::new(
962 Some(min.into()),
963 Some(max.into()),
964 stats.distinct_count(),
965 stats.null_count_opt(),
966 backwards_compatible_min_max,
967 )
968 .with_max_is_exact(!did_truncate_max)
969 .with_min_is_exact(!did_truncate_min),
970 )
971 }
972 Statistics::FixedLenByteArray(stats)
973 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
974 {
975 let (min, did_truncate_min) = self.truncate_min_value(
976 self.props.statistics_truncate_length(),
977 stats.min_bytes_opt().unwrap(),
978 );
979 let (max, did_truncate_max) = self.truncate_max_value(
980 self.props.statistics_truncate_length(),
981 stats.max_bytes_opt().unwrap(),
982 );
983 Statistics::FixedLenByteArray(
984 ValueStatistics::new(
985 Some(min.into()),
986 Some(max.into()),
987 stats.distinct_count(),
988 stats.null_count_opt(),
989 backwards_compatible_min_max,
990 )
991 .with_max_is_exact(!did_truncate_max)
992 .with_min_is_exact(!did_truncate_min),
993 )
994 }
995 stats => stats,
996 }
997 }
998
999 fn add_data_page(&mut self) -> Result<()> {
1002 let values_data = self.encoder.flush_data_page()?;
1004
1005 let max_def_level = self.descr.max_def_level();
1006 let max_rep_level = self.descr.max_rep_level();
1007
1008 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1009
1010 let page_statistics = match (values_data.min_value, values_data.max_value) {
1011 (Some(min), Some(max)) => {
1012 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1014 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1015
1016 (self.statistics_enabled == EnabledStatistics::Page).then_some(
1017 ValueStatistics::new(
1018 Some(min),
1019 Some(max),
1020 None,
1021 Some(self.page_metrics.num_page_nulls),
1022 false,
1023 ),
1024 )
1025 }
1026 _ => None,
1027 };
1028
1029 self.update_column_offset_index(
1031 page_statistics.as_ref(),
1032 values_data.variable_length_bytes,
1033 );
1034
1035 self.column_metrics
1037 .update_from_page_metrics(&self.page_metrics);
1038 self.column_metrics
1039 .update_variable_length_bytes(values_data.variable_length_bytes);
1040
1041 let page_statistics = page_statistics
1043 .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1044 .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1045
1046 let compressed_page = match self.props.writer_version() {
1047 WriterVersion::PARQUET_1_0 => {
1048 let mut buffer = vec![];
1049
1050 if max_rep_level > 0 {
1051 buffer.extend_from_slice(
1052 &self.encode_levels_v1(
1053 Encoding::RLE,
1054 &self.rep_levels_sink[..],
1055 max_rep_level,
1056 )[..],
1057 );
1058 }
1059
1060 if max_def_level > 0 {
1061 buffer.extend_from_slice(
1062 &self.encode_levels_v1(
1063 Encoding::RLE,
1064 &self.def_levels_sink[..],
1065 max_def_level,
1066 )[..],
1067 );
1068 }
1069
1070 buffer.extend_from_slice(&values_data.buf);
1071 let uncompressed_size = buffer.len();
1072
1073 if let Some(ref mut cmpr) = self.compressor {
1074 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1075 cmpr.compress(&buffer[..], &mut compressed_buf)?;
1076 buffer = compressed_buf;
1077 }
1078
1079 let data_page = Page::DataPage {
1080 buf: buffer.into(),
1081 num_values: self.page_metrics.num_buffered_values,
1082 encoding: values_data.encoding,
1083 def_level_encoding: Encoding::RLE,
1084 rep_level_encoding: Encoding::RLE,
1085 statistics: page_statistics,
1086 };
1087
1088 CompressedPage::new(data_page, uncompressed_size)
1089 }
1090 WriterVersion::PARQUET_2_0 => {
1091 let mut rep_levels_byte_len = 0;
1092 let mut def_levels_byte_len = 0;
1093 let mut buffer = vec![];
1094
1095 if max_rep_level > 0 {
1096 let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1097 rep_levels_byte_len = levels.len();
1098 buffer.extend_from_slice(&levels[..]);
1099 }
1100
1101 if max_def_level > 0 {
1102 let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1103 def_levels_byte_len = levels.len();
1104 buffer.extend_from_slice(&levels[..]);
1105 }
1106
1107 let uncompressed_size =
1108 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1109
1110 match self.compressor {
1112 Some(ref mut cmpr) => {
1113 cmpr.compress(&values_data.buf, &mut buffer)?;
1114 }
1115 None => buffer.extend_from_slice(&values_data.buf),
1116 }
1117
1118 let data_page = Page::DataPageV2 {
1119 buf: buffer.into(),
1120 num_values: self.page_metrics.num_buffered_values,
1121 encoding: values_data.encoding,
1122 num_nulls: self.page_metrics.num_page_nulls as u32,
1123 num_rows: self.page_metrics.num_buffered_rows,
1124 def_levels_byte_len: def_levels_byte_len as u32,
1125 rep_levels_byte_len: rep_levels_byte_len as u32,
1126 is_compressed: self.compressor.is_some(),
1127 statistics: page_statistics,
1128 };
1129
1130 CompressedPage::new(data_page, uncompressed_size)
1131 }
1132 };
1133
1134 if self.encoder.has_dictionary() {
1136 self.data_pages.push_back(compressed_page);
1137 } else {
1138 self.write_data_page(compressed_page)?;
1139 }
1140
1141 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1143
1144 self.rep_levels_sink.clear();
1146 self.def_levels_sink.clear();
1147 self.page_metrics.new_page();
1148
1149 Ok(())
1150 }
1151
1152 #[inline]
1155 fn flush_data_pages(&mut self) -> Result<()> {
1156 if self.page_metrics.num_buffered_values > 0 {
1158 self.add_data_page()?;
1159 }
1160
1161 while let Some(page) = self.data_pages.pop_front() {
1162 self.write_data_page(page)?;
1163 }
1164
1165 Ok(())
1166 }
1167
1168 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1170 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1171 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1172 let num_values = self.column_metrics.total_num_values as i64;
1173 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1174 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1176
1177 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1178 .set_compression(self.codec)
1179 .set_encodings(self.encodings.iter().cloned().collect())
1180 .set_page_encoding_stats(self.encoding_stats.clone())
1181 .set_total_compressed_size(total_compressed_size)
1182 .set_total_uncompressed_size(total_uncompressed_size)
1183 .set_num_values(num_values)
1184 .set_data_page_offset(data_page_offset)
1185 .set_dictionary_page_offset(dict_page_offset);
1186
1187 if self.statistics_enabled != EnabledStatistics::None {
1188 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1189
1190 let statistics = ValueStatistics::<E::T>::new(
1191 self.column_metrics.min_column_value.clone(),
1192 self.column_metrics.max_column_value.clone(),
1193 self.column_metrics.column_distinct_count,
1194 Some(self.column_metrics.num_column_nulls),
1195 false,
1196 )
1197 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1198 .into();
1199
1200 let statistics = self.truncate_statistics(statistics);
1201
1202 builder = builder
1203 .set_statistics(statistics)
1204 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1205 .set_repetition_level_histogram(
1206 self.column_metrics.repetition_level_histogram.take(),
1207 )
1208 .set_definition_level_histogram(
1209 self.column_metrics.definition_level_histogram.take(),
1210 );
1211 }
1212
1213 builder = self.set_column_chunk_encryption_properties(builder);
1214
1215 let metadata = builder.build()?;
1216 Ok(metadata)
1217 }
1218
1219 #[inline]
1221 fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1222 let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1223 encoder.put(levels);
1224 encoder.consume()
1225 }
1226
1227 #[inline]
1230 fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1231 let mut encoder = LevelEncoder::v2(max_level, levels.len());
1232 encoder.put(levels);
1233 encoder.consume()
1234 }
1235
1236 #[inline]
1238 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1239 self.encodings.insert(page.encoding());
1240 match self.encoding_stats.last_mut() {
1241 Some(encoding_stats)
1242 if encoding_stats.page_type == page.page_type()
1243 && encoding_stats.encoding == page.encoding() =>
1244 {
1245 encoding_stats.count += 1;
1246 }
1247 _ => {
1248 self.encoding_stats.push(PageEncodingStats {
1251 page_type: page.page_type(),
1252 encoding: page.encoding(),
1253 count: 1,
1254 });
1255 }
1256 }
1257 let page_spec = self.page_writer.write_page(page)?;
1258 if let Some(builder) = self.offset_index_builder.as_mut() {
1261 builder
1262 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1263 }
1264 self.update_metrics_for_page(page_spec);
1265 Ok(())
1266 }
1267
1268 #[inline]
1270 fn write_dictionary_page(&mut self) -> Result<()> {
1271 let compressed_page = {
1272 let mut page = self
1273 .encoder
1274 .flush_dict_page()?
1275 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1276
1277 let uncompressed_size = page.buf.len();
1278
1279 if let Some(ref mut cmpr) = self.compressor {
1280 let mut output_buf = Vec::with_capacity(uncompressed_size);
1281 cmpr.compress(&page.buf, &mut output_buf)?;
1282 page.buf = Bytes::from(output_buf);
1283 }
1284
1285 let dict_page = Page::DictionaryPage {
1286 buf: page.buf,
1287 num_values: page.num_values as u32,
1288 encoding: self.props.dictionary_page_encoding(),
1289 is_sorted: page.is_sorted,
1290 };
1291 CompressedPage::new(dict_page, uncompressed_size)
1292 };
1293
1294 self.encodings.insert(compressed_page.encoding());
1295 self.encoding_stats.push(PageEncodingStats {
1296 page_type: PageType::DICTIONARY_PAGE,
1297 encoding: compressed_page.encoding(),
1298 count: 1,
1299 });
1300 let page_spec = self.page_writer.write_page(compressed_page)?;
1301 self.update_metrics_for_page(page_spec);
1302 Ok(())
1304 }
1305
1306 #[inline]
1308 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1309 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1310 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1311 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1312
1313 match page_spec.page_type {
1314 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1315 self.column_metrics.total_num_values += page_spec.num_values as u64;
1316 if self.column_metrics.data_page_offset.is_none() {
1317 self.column_metrics.data_page_offset = Some(page_spec.offset);
1318 }
1319 }
1320 PageType::DICTIONARY_PAGE => {
1321 assert!(
1322 self.column_metrics.dictionary_page_offset.is_none(),
1323 "Dictionary offset is already set"
1324 );
1325 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1326 }
1327 _ => {}
1328 }
1329 }
1330
1331 #[inline]
1332 #[cfg(feature = "encryption")]
1333 fn set_column_chunk_encryption_properties(
1334 &self,
1335 builder: ColumnChunkMetaDataBuilder,
1336 ) -> ColumnChunkMetaDataBuilder {
1337 if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1338 builder.set_column_crypto_metadata(get_column_crypto_metadata(
1339 encryption_properties,
1340 &self.descr,
1341 ))
1342 } else {
1343 builder
1344 }
1345 }
1346
1347 #[inline]
1348 #[cfg(not(feature = "encryption"))]
1349 fn set_column_chunk_encryption_properties(
1350 &self,
1351 builder: ColumnChunkMetaDataBuilder,
1352 ) -> ColumnChunkMetaDataBuilder {
1353 builder
1354 }
1355}
1356
1357fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1358 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1359}
1360
1361fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1362 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1363}
1364
1365#[inline]
1366#[allow(clippy::eq_op)]
1367fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1368 match T::PHYSICAL_TYPE {
1369 Type::FLOAT | Type::DOUBLE => val != val,
1370 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1371 let val = val.as_bytes();
1372 let val = f16::from_le_bytes([val[0], val[1]]);
1373 val.is_nan()
1374 }
1375 _ => false,
1376 }
1377}
1378
1379fn update_stat<T: ParquetValueType, F>(
1384 descr: &ColumnDescriptor,
1385 val: &T,
1386 cur: &mut Option<T>,
1387 should_update: F,
1388) where
1389 F: Fn(&T) -> bool,
1390{
1391 if is_nan(descr, val) {
1392 return;
1393 }
1394
1395 if cur.as_ref().map_or(true, should_update) {
1396 *cur = Some(val.clone());
1397 }
1398}
1399
1400fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1402 if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
1403 if !is_signed {
1404 return a.as_u64().unwrap() > b.as_u64().unwrap();
1406 }
1407 }
1408
1409 match descr.converted_type() {
1410 ConvertedType::UINT_8
1411 | ConvertedType::UINT_16
1412 | ConvertedType::UINT_32
1413 | ConvertedType::UINT_64 => {
1414 return a.as_u64().unwrap() > b.as_u64().unwrap();
1415 }
1416 _ => {}
1417 };
1418
1419 if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1420 match T::PHYSICAL_TYPE {
1421 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1422 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1423 }
1424 _ => {}
1425 };
1426 }
1427
1428 if descr.converted_type() == ConvertedType::DECIMAL {
1429 match T::PHYSICAL_TYPE {
1430 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1431 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1432 }
1433 _ => {}
1434 };
1435 };
1436
1437 if let Some(LogicalType::Float16) = descr.logical_type() {
1438 let a = a.as_bytes();
1439 let a = f16::from_le_bytes([a[0], a[1]]);
1440 let b = b.as_bytes();
1441 let b = f16::from_le_bytes([b[0], b[1]]);
1442 return a > b;
1443 }
1444
1445 a > b
1446}
1447
1448fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1456 match (kind, props.writer_version()) {
1457 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1458 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1459 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1460 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1461 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1462 _ => Encoding::PLAIN,
1463 }
1464}
1465
1466fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1468 match (kind, props.writer_version()) {
1469 (Type::BOOLEAN, _) => false,
1471 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1473 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1474 _ => true,
1475 }
1476}
1477
1478fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1480 let a_length = a.len();
1481 let b_length = b.len();
1482
1483 if a_length == 0 || b_length == 0 {
1484 return a_length > 0;
1485 }
1486
1487 let first_a: u8 = a[0];
1488 let first_b: u8 = b[0];
1489
1490 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1495 return (first_a as i8) > (first_b as i8);
1496 }
1497
1498 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1504
1505 if a_length != b_length {
1506 let not_equal = if a_length > b_length {
1507 let lead_length = a_length - b_length;
1508 a[0..lead_length].iter().any(|&x| x != extension)
1509 } else {
1510 let lead_length = b_length - a_length;
1511 b[0..lead_length].iter().any(|&x| x != extension)
1512 };
1513
1514 if not_equal {
1515 let negative_values: bool = (first_a as i8) < 0;
1516 let a_longer: bool = a_length > b_length;
1517 return if negative_values { !a_longer } else { a_longer };
1518 }
1519 }
1520
1521 (a[1..]) > (b[1..])
1522}
1523
1524fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1530 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1531 Some(data.as_bytes()[..split].to_vec())
1532}
1533
1534fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1540 let lower_bound = length.saturating_sub(3);
1542 let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1543 increment_utf8(data.get(..split)?)
1544}
1545
1546fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1553 for (idx, original_char) in data.char_indices().rev() {
1554 let original_len = original_char.len_utf8();
1555 if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1556 if next_char.len_utf8() == original_len {
1558 let mut result = data.as_bytes()[..idx + original_len].to_vec();
1559 next_char.encode_utf8(&mut result[idx..]);
1560 return Some(result);
1561 }
1562 }
1563 }
1564
1565 None
1566}
1567
1568fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1572 for byte in data.iter_mut().rev() {
1573 let (incremented, overflow) = byte.overflowing_add(1);
1574 *byte = incremented;
1575
1576 if !overflow {
1577 return Some(data);
1578 }
1579 }
1580
1581 None
1582}
1583
1584#[cfg(test)]
1585mod tests {
1586 use crate::{
1587 file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1588 schema::parser::parse_message_type,
1589 };
1590 use core::str;
1591 use rand::distr::uniform::SampleUniform;
1592 use std::{fs::File, sync::Arc};
1593
1594 use crate::column::{
1595 page::PageReader,
1596 reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1597 };
1598 use crate::file::writer::TrackedWrite;
1599 use crate::file::{
1600 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1601 };
1602 use crate::schema::types::{ColumnPath, Type as SchemaType};
1603 use crate::util::test_common::rand_gen::random_numbers_range;
1604
1605 use super::*;
1606
1607 #[test]
1608 fn test_column_writer_inconsistent_def_rep_length() {
1609 let page_writer = get_test_page_writer();
1610 let props = Default::default();
1611 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1612 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1613 assert!(res.is_err());
1614 if let Err(err) = res {
1615 assert_eq!(
1616 format!("{err}"),
1617 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1618 );
1619 }
1620 }
1621
1622 #[test]
1623 fn test_column_writer_invalid_def_levels() {
1624 let page_writer = get_test_page_writer();
1625 let props = Default::default();
1626 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1627 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1628 assert!(res.is_err());
1629 if let Err(err) = res {
1630 assert_eq!(
1631 format!("{err}"),
1632 "Parquet error: Definition levels are required, because max definition level = 1"
1633 );
1634 }
1635 }
1636
1637 #[test]
1638 fn test_column_writer_invalid_rep_levels() {
1639 let page_writer = get_test_page_writer();
1640 let props = Default::default();
1641 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1642 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1643 assert!(res.is_err());
1644 if let Err(err) = res {
1645 assert_eq!(
1646 format!("{err}"),
1647 "Parquet error: Repetition levels are required, because max repetition level = 1"
1648 );
1649 }
1650 }
1651
1652 #[test]
1653 fn test_column_writer_not_enough_values_to_write() {
1654 let page_writer = get_test_page_writer();
1655 let props = Default::default();
1656 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1657 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1658 assert!(res.is_err());
1659 if let Err(err) = res {
1660 assert_eq!(
1661 format!("{err}"),
1662 "Parquet error: Expected to write 4 values, but have only 2"
1663 );
1664 }
1665 }
1666
1667 #[test]
1668 fn test_column_writer_write_only_one_dictionary_page() {
1669 let page_writer = get_test_page_writer();
1670 let props = Default::default();
1671 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1672 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1673 writer.add_data_page().unwrap();
1675 writer.write_dictionary_page().unwrap();
1676 let err = writer.write_dictionary_page().unwrap_err().to_string();
1677 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1678 }
1679
1680 #[test]
1681 fn test_column_writer_error_when_writing_disabled_dictionary() {
1682 let page_writer = get_test_page_writer();
1683 let props = Arc::new(
1684 WriterProperties::builder()
1685 .set_dictionary_enabled(false)
1686 .build(),
1687 );
1688 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1689 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1690 let err = writer.write_dictionary_page().unwrap_err().to_string();
1691 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1692 }
1693
1694 #[test]
1695 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1696 let page_writer = get_test_page_writer();
1697 let props = Arc::new(
1698 WriterProperties::builder()
1699 .set_dictionary_enabled(true)
1700 .build(),
1701 );
1702 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1703 writer
1704 .write_batch(&[true, false, true, false], None, None)
1705 .unwrap();
1706
1707 let r = writer.close().unwrap();
1708 assert_eq!(r.bytes_written, 1);
1711 assert_eq!(r.rows_written, 4);
1712
1713 let metadata = r.metadata;
1714 assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1715 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
1717 }
1718
1719 #[test]
1720 fn test_column_writer_default_encoding_support_bool() {
1721 check_encoding_write_support::<BoolType>(
1722 WriterVersion::PARQUET_1_0,
1723 true,
1724 &[true, false],
1725 None,
1726 &[Encoding::PLAIN, Encoding::RLE],
1727 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1728 );
1729 check_encoding_write_support::<BoolType>(
1730 WriterVersion::PARQUET_1_0,
1731 false,
1732 &[true, false],
1733 None,
1734 &[Encoding::PLAIN, Encoding::RLE],
1735 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1736 );
1737 check_encoding_write_support::<BoolType>(
1738 WriterVersion::PARQUET_2_0,
1739 true,
1740 &[true, false],
1741 None,
1742 &[Encoding::RLE],
1743 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1744 );
1745 check_encoding_write_support::<BoolType>(
1746 WriterVersion::PARQUET_2_0,
1747 false,
1748 &[true, false],
1749 None,
1750 &[Encoding::RLE],
1751 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1752 );
1753 }
1754
1755 #[test]
1756 fn test_column_writer_default_encoding_support_int32() {
1757 check_encoding_write_support::<Int32Type>(
1758 WriterVersion::PARQUET_1_0,
1759 true,
1760 &[1, 2],
1761 Some(0),
1762 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1763 &[
1764 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1765 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1766 ],
1767 );
1768 check_encoding_write_support::<Int32Type>(
1769 WriterVersion::PARQUET_1_0,
1770 false,
1771 &[1, 2],
1772 None,
1773 &[Encoding::PLAIN, Encoding::RLE],
1774 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1775 );
1776 check_encoding_write_support::<Int32Type>(
1777 WriterVersion::PARQUET_2_0,
1778 true,
1779 &[1, 2],
1780 Some(0),
1781 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1782 &[
1783 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1784 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1785 ],
1786 );
1787 check_encoding_write_support::<Int32Type>(
1788 WriterVersion::PARQUET_2_0,
1789 false,
1790 &[1, 2],
1791 None,
1792 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1793 &[encoding_stats(
1794 PageType::DATA_PAGE_V2,
1795 Encoding::DELTA_BINARY_PACKED,
1796 1,
1797 )],
1798 );
1799 }
1800
1801 #[test]
1802 fn test_column_writer_default_encoding_support_int64() {
1803 check_encoding_write_support::<Int64Type>(
1804 WriterVersion::PARQUET_1_0,
1805 true,
1806 &[1, 2],
1807 Some(0),
1808 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1809 &[
1810 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1811 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1812 ],
1813 );
1814 check_encoding_write_support::<Int64Type>(
1815 WriterVersion::PARQUET_1_0,
1816 false,
1817 &[1, 2],
1818 None,
1819 &[Encoding::PLAIN, Encoding::RLE],
1820 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1821 );
1822 check_encoding_write_support::<Int64Type>(
1823 WriterVersion::PARQUET_2_0,
1824 true,
1825 &[1, 2],
1826 Some(0),
1827 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1828 &[
1829 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1830 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1831 ],
1832 );
1833 check_encoding_write_support::<Int64Type>(
1834 WriterVersion::PARQUET_2_0,
1835 false,
1836 &[1, 2],
1837 None,
1838 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1839 &[encoding_stats(
1840 PageType::DATA_PAGE_V2,
1841 Encoding::DELTA_BINARY_PACKED,
1842 1,
1843 )],
1844 );
1845 }
1846
1847 #[test]
1848 fn test_column_writer_default_encoding_support_int96() {
1849 check_encoding_write_support::<Int96Type>(
1850 WriterVersion::PARQUET_1_0,
1851 true,
1852 &[Int96::from(vec![1, 2, 3])],
1853 Some(0),
1854 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1855 &[
1856 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1857 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1858 ],
1859 );
1860 check_encoding_write_support::<Int96Type>(
1861 WriterVersion::PARQUET_1_0,
1862 false,
1863 &[Int96::from(vec![1, 2, 3])],
1864 None,
1865 &[Encoding::PLAIN, Encoding::RLE],
1866 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1867 );
1868 check_encoding_write_support::<Int96Type>(
1869 WriterVersion::PARQUET_2_0,
1870 true,
1871 &[Int96::from(vec![1, 2, 3])],
1872 Some(0),
1873 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1874 &[
1875 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1876 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1877 ],
1878 );
1879 check_encoding_write_support::<Int96Type>(
1880 WriterVersion::PARQUET_2_0,
1881 false,
1882 &[Int96::from(vec![1, 2, 3])],
1883 None,
1884 &[Encoding::PLAIN, Encoding::RLE],
1885 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1886 );
1887 }
1888
1889 #[test]
1890 fn test_column_writer_default_encoding_support_float() {
1891 check_encoding_write_support::<FloatType>(
1892 WriterVersion::PARQUET_1_0,
1893 true,
1894 &[1.0, 2.0],
1895 Some(0),
1896 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1897 &[
1898 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1899 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1900 ],
1901 );
1902 check_encoding_write_support::<FloatType>(
1903 WriterVersion::PARQUET_1_0,
1904 false,
1905 &[1.0, 2.0],
1906 None,
1907 &[Encoding::PLAIN, Encoding::RLE],
1908 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1909 );
1910 check_encoding_write_support::<FloatType>(
1911 WriterVersion::PARQUET_2_0,
1912 true,
1913 &[1.0, 2.0],
1914 Some(0),
1915 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1916 &[
1917 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1918 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1919 ],
1920 );
1921 check_encoding_write_support::<FloatType>(
1922 WriterVersion::PARQUET_2_0,
1923 false,
1924 &[1.0, 2.0],
1925 None,
1926 &[Encoding::PLAIN, Encoding::RLE],
1927 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1928 );
1929 }
1930
1931 #[test]
1932 fn test_column_writer_default_encoding_support_double() {
1933 check_encoding_write_support::<DoubleType>(
1934 WriterVersion::PARQUET_1_0,
1935 true,
1936 &[1.0, 2.0],
1937 Some(0),
1938 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1939 &[
1940 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1941 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1942 ],
1943 );
1944 check_encoding_write_support::<DoubleType>(
1945 WriterVersion::PARQUET_1_0,
1946 false,
1947 &[1.0, 2.0],
1948 None,
1949 &[Encoding::PLAIN, Encoding::RLE],
1950 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1951 );
1952 check_encoding_write_support::<DoubleType>(
1953 WriterVersion::PARQUET_2_0,
1954 true,
1955 &[1.0, 2.0],
1956 Some(0),
1957 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1958 &[
1959 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1960 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1961 ],
1962 );
1963 check_encoding_write_support::<DoubleType>(
1964 WriterVersion::PARQUET_2_0,
1965 false,
1966 &[1.0, 2.0],
1967 None,
1968 &[Encoding::PLAIN, Encoding::RLE],
1969 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1970 );
1971 }
1972
1973 #[test]
1974 fn test_column_writer_default_encoding_support_byte_array() {
1975 check_encoding_write_support::<ByteArrayType>(
1976 WriterVersion::PARQUET_1_0,
1977 true,
1978 &[ByteArray::from(vec![1u8])],
1979 Some(0),
1980 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1981 &[
1982 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1983 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1984 ],
1985 );
1986 check_encoding_write_support::<ByteArrayType>(
1987 WriterVersion::PARQUET_1_0,
1988 false,
1989 &[ByteArray::from(vec![1u8])],
1990 None,
1991 &[Encoding::PLAIN, Encoding::RLE],
1992 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1993 );
1994 check_encoding_write_support::<ByteArrayType>(
1995 WriterVersion::PARQUET_2_0,
1996 true,
1997 &[ByteArray::from(vec![1u8])],
1998 Some(0),
1999 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2000 &[
2001 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2002 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2003 ],
2004 );
2005 check_encoding_write_support::<ByteArrayType>(
2006 WriterVersion::PARQUET_2_0,
2007 false,
2008 &[ByteArray::from(vec![1u8])],
2009 None,
2010 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2011 &[encoding_stats(
2012 PageType::DATA_PAGE_V2,
2013 Encoding::DELTA_BYTE_ARRAY,
2014 1,
2015 )],
2016 );
2017 }
2018
2019 #[test]
2020 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2021 check_encoding_write_support::<FixedLenByteArrayType>(
2022 WriterVersion::PARQUET_1_0,
2023 true,
2024 &[ByteArray::from(vec![1u8]).into()],
2025 None,
2026 &[Encoding::PLAIN, Encoding::RLE],
2027 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2028 );
2029 check_encoding_write_support::<FixedLenByteArrayType>(
2030 WriterVersion::PARQUET_1_0,
2031 false,
2032 &[ByteArray::from(vec![1u8]).into()],
2033 None,
2034 &[Encoding::PLAIN, Encoding::RLE],
2035 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2036 );
2037 check_encoding_write_support::<FixedLenByteArrayType>(
2038 WriterVersion::PARQUET_2_0,
2039 true,
2040 &[ByteArray::from(vec![1u8]).into()],
2041 Some(0),
2042 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2043 &[
2044 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2045 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2046 ],
2047 );
2048 check_encoding_write_support::<FixedLenByteArrayType>(
2049 WriterVersion::PARQUET_2_0,
2050 false,
2051 &[ByteArray::from(vec![1u8]).into()],
2052 None,
2053 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2054 &[encoding_stats(
2055 PageType::DATA_PAGE_V2,
2056 Encoding::DELTA_BYTE_ARRAY,
2057 1,
2058 )],
2059 );
2060 }
2061
2062 #[test]
2063 fn test_column_writer_check_metadata() {
2064 let page_writer = get_test_page_writer();
2065 let props = Default::default();
2066 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2067 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2068
2069 let r = writer.close().unwrap();
2070 assert_eq!(r.bytes_written, 20);
2071 assert_eq!(r.rows_written, 4);
2072
2073 let metadata = r.metadata;
2074 assert_eq!(
2075 metadata.encodings(),
2076 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2077 );
2078 assert_eq!(metadata.num_values(), 4);
2079 assert_eq!(metadata.compressed_size(), 20);
2080 assert_eq!(metadata.uncompressed_size(), 20);
2081 assert_eq!(metadata.data_page_offset(), 0);
2082 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2083 if let Some(stats) = metadata.statistics() {
2084 assert_eq!(stats.null_count_opt(), Some(0));
2085 assert_eq!(stats.distinct_count_opt(), None);
2086 if let Statistics::Int32(stats) = stats {
2087 assert_eq!(stats.min_opt().unwrap(), &1);
2088 assert_eq!(stats.max_opt().unwrap(), &4);
2089 } else {
2090 panic!("expecting Statistics::Int32");
2091 }
2092 } else {
2093 panic!("metadata missing statistics");
2094 }
2095 }
2096
2097 #[test]
2098 fn test_column_writer_check_byte_array_min_max() {
2099 let page_writer = get_test_page_writer();
2100 let props = Default::default();
2101 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2102 writer
2103 .write_batch(
2104 &[
2105 ByteArray::from(vec![
2106 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2107 35u8, 231u8, 90u8, 0u8, 0u8,
2108 ]),
2109 ByteArray::from(vec![
2110 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2111 152u8, 177u8, 56u8, 0u8, 0u8,
2112 ]),
2113 ByteArray::from(vec![
2114 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2115 0u8,
2116 ]),
2117 ByteArray::from(vec![
2118 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2119 44u8, 0u8, 0u8,
2120 ]),
2121 ],
2122 None,
2123 None,
2124 )
2125 .unwrap();
2126 let metadata = writer.close().unwrap().metadata;
2127 if let Some(stats) = metadata.statistics() {
2128 if let Statistics::ByteArray(stats) = stats {
2129 assert_eq!(
2130 stats.min_opt().unwrap(),
2131 &ByteArray::from(vec![
2132 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2133 35u8, 231u8, 90u8, 0u8, 0u8,
2134 ])
2135 );
2136 assert_eq!(
2137 stats.max_opt().unwrap(),
2138 &ByteArray::from(vec![
2139 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2140 44u8, 0u8, 0u8,
2141 ])
2142 );
2143 } else {
2144 panic!("expecting Statistics::ByteArray");
2145 }
2146 } else {
2147 panic!("metadata missing statistics");
2148 }
2149 }
2150
2151 #[test]
2152 fn test_column_writer_uint32_converted_type_min_max() {
2153 let page_writer = get_test_page_writer();
2154 let props = Default::default();
2155 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2156 page_writer,
2157 0,
2158 0,
2159 props,
2160 );
2161 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2162 let metadata = writer.close().unwrap().metadata;
2163 if let Some(stats) = metadata.statistics() {
2164 if let Statistics::Int32(stats) = stats {
2165 assert_eq!(stats.min_opt().unwrap(), &0,);
2166 assert_eq!(stats.max_opt().unwrap(), &5,);
2167 } else {
2168 panic!("expecting Statistics::Int32");
2169 }
2170 } else {
2171 panic!("metadata missing statistics");
2172 }
2173 }
2174
2175 #[test]
2176 fn test_column_writer_precalculated_statistics() {
2177 let page_writer = get_test_page_writer();
2178 let props = Arc::new(
2179 WriterProperties::builder()
2180 .set_statistics_enabled(EnabledStatistics::Chunk)
2181 .build(),
2182 );
2183 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2184 writer
2185 .write_batch_with_statistics(
2186 &[1, 2, 3, 4],
2187 None,
2188 None,
2189 Some(&-17),
2190 Some(&9000),
2191 Some(55),
2192 )
2193 .unwrap();
2194
2195 let r = writer.close().unwrap();
2196 assert_eq!(r.bytes_written, 20);
2197 assert_eq!(r.rows_written, 4);
2198
2199 let metadata = r.metadata;
2200 assert_eq!(
2201 metadata.encodings(),
2202 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2203 );
2204 assert_eq!(metadata.num_values(), 4);
2205 assert_eq!(metadata.compressed_size(), 20);
2206 assert_eq!(metadata.uncompressed_size(), 20);
2207 assert_eq!(metadata.data_page_offset(), 0);
2208 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2209 if let Some(stats) = metadata.statistics() {
2210 assert_eq!(stats.null_count_opt(), Some(0));
2211 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2212 if let Statistics::Int32(stats) = stats {
2213 assert_eq!(stats.min_opt().unwrap(), &-17);
2214 assert_eq!(stats.max_opt().unwrap(), &9000);
2215 } else {
2216 panic!("expecting Statistics::Int32");
2217 }
2218 } else {
2219 panic!("metadata missing statistics");
2220 }
2221 }
2222
2223 #[test]
2224 fn test_mixed_precomputed_statistics() {
2225 let mut buf = Vec::with_capacity(100);
2226 let mut write = TrackedWrite::new(&mut buf);
2227 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2228 let props = Arc::new(
2229 WriterProperties::builder()
2230 .set_write_page_header_statistics(true)
2231 .build(),
2232 );
2233 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2234
2235 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2236 writer
2237 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2238 .unwrap();
2239
2240 let r = writer.close().unwrap();
2241
2242 let stats = r.metadata.statistics().unwrap();
2243 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2244 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2245 assert_eq!(stats.null_count_opt(), Some(0));
2246 assert!(stats.distinct_count_opt().is_none());
2247
2248 drop(write);
2249
2250 let props = ReaderProperties::builder()
2251 .set_backward_compatible_lz4(false)
2252 .build();
2253 let reader = SerializedPageReader::new_with_properties(
2254 Arc::new(Bytes::from(buf)),
2255 &r.metadata,
2256 r.rows_written as usize,
2257 None,
2258 Arc::new(props),
2259 )
2260 .unwrap();
2261
2262 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2263 assert_eq!(pages.len(), 2);
2264
2265 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2266 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2267
2268 let page_statistics = pages[1].statistics().unwrap();
2269 assert_eq!(
2270 page_statistics.min_bytes_opt().unwrap(),
2271 1_i32.to_le_bytes()
2272 );
2273 assert_eq!(
2274 page_statistics.max_bytes_opt().unwrap(),
2275 7_i32.to_le_bytes()
2276 );
2277 assert_eq!(page_statistics.null_count_opt(), Some(0));
2278 assert!(page_statistics.distinct_count_opt().is_none());
2279 }
2280
2281 #[test]
2282 fn test_disabled_statistics() {
2283 let mut buf = Vec::with_capacity(100);
2284 let mut write = TrackedWrite::new(&mut buf);
2285 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2286 let props = WriterProperties::builder()
2287 .set_statistics_enabled(EnabledStatistics::None)
2288 .set_writer_version(WriterVersion::PARQUET_2_0)
2289 .build();
2290 let props = Arc::new(props);
2291
2292 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2293 writer
2294 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2295 .unwrap();
2296
2297 let r = writer.close().unwrap();
2298 assert!(r.metadata.statistics().is_none());
2299
2300 drop(write);
2301
2302 let props = ReaderProperties::builder()
2303 .set_backward_compatible_lz4(false)
2304 .build();
2305 let reader = SerializedPageReader::new_with_properties(
2306 Arc::new(Bytes::from(buf)),
2307 &r.metadata,
2308 r.rows_written as usize,
2309 None,
2310 Arc::new(props),
2311 )
2312 .unwrap();
2313
2314 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2315 assert_eq!(pages.len(), 2);
2316
2317 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2318 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2319
2320 match &pages[1] {
2321 Page::DataPageV2 {
2322 num_values,
2323 num_nulls,
2324 num_rows,
2325 statistics,
2326 ..
2327 } => {
2328 assert_eq!(*num_values, 6);
2329 assert_eq!(*num_nulls, 2);
2330 assert_eq!(*num_rows, 6);
2331 assert!(statistics.is_none());
2332 }
2333 _ => unreachable!(),
2334 }
2335 }
2336
2337 #[test]
2338 fn test_column_writer_empty_column_roundtrip() {
2339 let props = Default::default();
2340 column_roundtrip::<Int32Type>(props, &[], None, None);
2341 }
2342
2343 #[test]
2344 fn test_column_writer_non_nullable_values_roundtrip() {
2345 let props = Default::default();
2346 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2347 }
2348
2349 #[test]
2350 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2351 let props = Default::default();
2352 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2353 }
2354
2355 #[test]
2356 fn test_column_writer_nullable_repeated_values_roundtrip() {
2357 let props = Default::default();
2358 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2359 }
2360
2361 #[test]
2362 fn test_column_writer_dictionary_fallback_small_data_page() {
2363 let props = WriterProperties::builder()
2364 .set_dictionary_page_size_limit(32)
2365 .set_data_page_size_limit(32)
2366 .build();
2367 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2368 }
2369
2370 #[test]
2371 fn test_column_writer_small_write_batch_size() {
2372 for i in &[1usize, 2, 5, 10, 11, 1023] {
2373 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2374
2375 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2376 }
2377 }
2378
2379 #[test]
2380 fn test_column_writer_dictionary_disabled_v1() {
2381 let props = WriterProperties::builder()
2382 .set_writer_version(WriterVersion::PARQUET_1_0)
2383 .set_dictionary_enabled(false)
2384 .build();
2385 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2386 }
2387
2388 #[test]
2389 fn test_column_writer_dictionary_disabled_v2() {
2390 let props = WriterProperties::builder()
2391 .set_writer_version(WriterVersion::PARQUET_2_0)
2392 .set_dictionary_enabled(false)
2393 .build();
2394 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2395 }
2396
2397 #[test]
2398 fn test_column_writer_compression_v1() {
2399 let props = WriterProperties::builder()
2400 .set_writer_version(WriterVersion::PARQUET_1_0)
2401 .set_compression(Compression::SNAPPY)
2402 .build();
2403 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2404 }
2405
2406 #[test]
2407 fn test_column_writer_compression_v2() {
2408 let props = WriterProperties::builder()
2409 .set_writer_version(WriterVersion::PARQUET_2_0)
2410 .set_compression(Compression::SNAPPY)
2411 .build();
2412 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2413 }
2414
2415 #[test]
2416 fn test_column_writer_add_data_pages_with_dict() {
2417 let mut file = tempfile::tempfile().unwrap();
2420 let mut write = TrackedWrite::new(&mut file);
2421 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2422 let props = Arc::new(
2423 WriterProperties::builder()
2424 .set_data_page_size_limit(10)
2425 .set_write_batch_size(3) .build(),
2427 );
2428 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2429 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2430 writer.write_batch(data, None, None).unwrap();
2431 let r = writer.close().unwrap();
2432
2433 drop(write);
2434
2435 let props = ReaderProperties::builder()
2437 .set_backward_compatible_lz4(false)
2438 .build();
2439 let mut page_reader = Box::new(
2440 SerializedPageReader::new_with_properties(
2441 Arc::new(file),
2442 &r.metadata,
2443 r.rows_written as usize,
2444 None,
2445 Arc::new(props),
2446 )
2447 .unwrap(),
2448 );
2449 let mut res = Vec::new();
2450 while let Some(page) = page_reader.get_next_page().unwrap() {
2451 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2452 }
2453 assert_eq!(
2454 res,
2455 vec![
2456 (PageType::DICTIONARY_PAGE, 10, 40),
2457 (PageType::DATA_PAGE, 9, 10),
2458 (PageType::DATA_PAGE, 1, 3),
2459 ]
2460 );
2461 assert_eq!(
2462 r.metadata.page_encoding_stats(),
2463 Some(&vec![
2464 PageEncodingStats {
2465 page_type: PageType::DICTIONARY_PAGE,
2466 encoding: Encoding::PLAIN,
2467 count: 1
2468 },
2469 PageEncodingStats {
2470 page_type: PageType::DATA_PAGE,
2471 encoding: Encoding::RLE_DICTIONARY,
2472 count: 2,
2473 }
2474 ])
2475 );
2476 }
2477
2478 #[test]
2479 fn test_bool_statistics() {
2480 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2481 assert!(!stats.is_min_max_backwards_compatible());
2484 if let Statistics::Boolean(stats) = stats {
2485 assert_eq!(stats.min_opt().unwrap(), &false);
2486 assert_eq!(stats.max_opt().unwrap(), &true);
2487 } else {
2488 panic!("expecting Statistics::Boolean, got {stats:?}");
2489 }
2490 }
2491
2492 #[test]
2493 fn test_int32_statistics() {
2494 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2495 assert!(stats.is_min_max_backwards_compatible());
2496 if let Statistics::Int32(stats) = stats {
2497 assert_eq!(stats.min_opt().unwrap(), &-2);
2498 assert_eq!(stats.max_opt().unwrap(), &3);
2499 } else {
2500 panic!("expecting Statistics::Int32, got {stats:?}");
2501 }
2502 }
2503
2504 #[test]
2505 fn test_int64_statistics() {
2506 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2507 assert!(stats.is_min_max_backwards_compatible());
2508 if let Statistics::Int64(stats) = stats {
2509 assert_eq!(stats.min_opt().unwrap(), &-2);
2510 assert_eq!(stats.max_opt().unwrap(), &3);
2511 } else {
2512 panic!("expecting Statistics::Int64, got {stats:?}");
2513 }
2514 }
2515
2516 #[test]
2517 fn test_int96_statistics() {
2518 let input = vec![
2519 Int96::from(vec![1, 20, 30]),
2520 Int96::from(vec![3, 20, 10]),
2521 Int96::from(vec![0, 20, 30]),
2522 Int96::from(vec![2, 20, 30]),
2523 ]
2524 .into_iter()
2525 .collect::<Vec<Int96>>();
2526
2527 let stats = statistics_roundtrip::<Int96Type>(&input);
2528 assert!(!stats.is_min_max_backwards_compatible());
2529 if let Statistics::Int96(stats) = stats {
2530 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
2531 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2532 } else {
2533 panic!("expecting Statistics::Int96, got {stats:?}");
2534 }
2535 }
2536
2537 #[test]
2538 fn test_float_statistics() {
2539 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2540 assert!(stats.is_min_max_backwards_compatible());
2541 if let Statistics::Float(stats) = stats {
2542 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2543 assert_eq!(stats.max_opt().unwrap(), &3.0);
2544 } else {
2545 panic!("expecting Statistics::Float, got {stats:?}");
2546 }
2547 }
2548
2549 #[test]
2550 fn test_double_statistics() {
2551 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2552 assert!(stats.is_min_max_backwards_compatible());
2553 if let Statistics::Double(stats) = stats {
2554 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2555 assert_eq!(stats.max_opt().unwrap(), &3.0);
2556 } else {
2557 panic!("expecting Statistics::Double, got {stats:?}");
2558 }
2559 }
2560
2561 #[test]
2562 fn test_byte_array_statistics() {
2563 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2564 .iter()
2565 .map(|&s| s.into())
2566 .collect::<Vec<_>>();
2567
2568 let stats = statistics_roundtrip::<ByteArrayType>(&input);
2569 assert!(!stats.is_min_max_backwards_compatible());
2570 if let Statistics::ByteArray(stats) = stats {
2571 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2572 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2573 } else {
2574 panic!("expecting Statistics::ByteArray, got {stats:?}");
2575 }
2576 }
2577
2578 #[test]
2579 fn test_fixed_len_byte_array_statistics() {
2580 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
2581 .iter()
2582 .map(|&s| ByteArray::from(s).into())
2583 .collect::<Vec<_>>();
2584
2585 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2586 assert!(!stats.is_min_max_backwards_compatible());
2587 if let Statistics::FixedLenByteArray(stats) = stats {
2588 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
2589 assert_eq!(stats.min_opt().unwrap(), &expected_min);
2590 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
2591 assert_eq!(stats.max_opt().unwrap(), &expected_max);
2592 } else {
2593 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2594 }
2595 }
2596
2597 #[test]
2598 fn test_column_writer_check_float16_min_max() {
2599 let input = [
2600 -f16::ONE,
2601 f16::from_f32(3.0),
2602 -f16::from_f32(2.0),
2603 f16::from_f32(2.0),
2604 ]
2605 .into_iter()
2606 .map(|s| ByteArray::from(s).into())
2607 .collect::<Vec<_>>();
2608
2609 let stats = float16_statistics_roundtrip(&input);
2610 assert!(stats.is_min_max_backwards_compatible());
2611 assert_eq!(
2612 stats.min_opt().unwrap(),
2613 &ByteArray::from(-f16::from_f32(2.0))
2614 );
2615 assert_eq!(
2616 stats.max_opt().unwrap(),
2617 &ByteArray::from(f16::from_f32(3.0))
2618 );
2619 }
2620
2621 #[test]
2622 fn test_column_writer_check_float16_nan_middle() {
2623 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2624 .into_iter()
2625 .map(|s| ByteArray::from(s).into())
2626 .collect::<Vec<_>>();
2627
2628 let stats = float16_statistics_roundtrip(&input);
2629 assert!(stats.is_min_max_backwards_compatible());
2630 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2631 assert_eq!(
2632 stats.max_opt().unwrap(),
2633 &ByteArray::from(f16::ONE + f16::ONE)
2634 );
2635 }
2636
2637 #[test]
2638 fn test_float16_statistics_nan_middle() {
2639 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2640 .into_iter()
2641 .map(|s| ByteArray::from(s).into())
2642 .collect::<Vec<_>>();
2643
2644 let stats = float16_statistics_roundtrip(&input);
2645 assert!(stats.is_min_max_backwards_compatible());
2646 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2647 assert_eq!(
2648 stats.max_opt().unwrap(),
2649 &ByteArray::from(f16::ONE + f16::ONE)
2650 );
2651 }
2652
2653 #[test]
2654 fn test_float16_statistics_nan_start() {
2655 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2656 .into_iter()
2657 .map(|s| ByteArray::from(s).into())
2658 .collect::<Vec<_>>();
2659
2660 let stats = float16_statistics_roundtrip(&input);
2661 assert!(stats.is_min_max_backwards_compatible());
2662 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2663 assert_eq!(
2664 stats.max_opt().unwrap(),
2665 &ByteArray::from(f16::ONE + f16::ONE)
2666 );
2667 }
2668
2669 #[test]
2670 fn test_float16_statistics_nan_only() {
2671 let input = [f16::NAN, f16::NAN]
2672 .into_iter()
2673 .map(|s| ByteArray::from(s).into())
2674 .collect::<Vec<_>>();
2675
2676 let stats = float16_statistics_roundtrip(&input);
2677 assert!(stats.min_bytes_opt().is_none());
2678 assert!(stats.max_bytes_opt().is_none());
2679 assert!(stats.is_min_max_backwards_compatible());
2680 }
2681
2682 #[test]
2683 fn test_float16_statistics_zero_only() {
2684 let input = [f16::ZERO]
2685 .into_iter()
2686 .map(|s| ByteArray::from(s).into())
2687 .collect::<Vec<_>>();
2688
2689 let stats = float16_statistics_roundtrip(&input);
2690 assert!(stats.is_min_max_backwards_compatible());
2691 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2692 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2693 }
2694
2695 #[test]
2696 fn test_float16_statistics_neg_zero_only() {
2697 let input = [f16::NEG_ZERO]
2698 .into_iter()
2699 .map(|s| ByteArray::from(s).into())
2700 .collect::<Vec<_>>();
2701
2702 let stats = float16_statistics_roundtrip(&input);
2703 assert!(stats.is_min_max_backwards_compatible());
2704 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2705 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2706 }
2707
2708 #[test]
2709 fn test_float16_statistics_zero_min() {
2710 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2711 .into_iter()
2712 .map(|s| ByteArray::from(s).into())
2713 .collect::<Vec<_>>();
2714
2715 let stats = float16_statistics_roundtrip(&input);
2716 assert!(stats.is_min_max_backwards_compatible());
2717 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2718 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2719 }
2720
2721 #[test]
2722 fn test_float16_statistics_neg_zero_max() {
2723 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2724 .into_iter()
2725 .map(|s| ByteArray::from(s).into())
2726 .collect::<Vec<_>>();
2727
2728 let stats = float16_statistics_roundtrip(&input);
2729 assert!(stats.is_min_max_backwards_compatible());
2730 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2731 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2732 }
2733
2734 #[test]
2735 fn test_float_statistics_nan_middle() {
2736 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2737 assert!(stats.is_min_max_backwards_compatible());
2738 if let Statistics::Float(stats) = stats {
2739 assert_eq!(stats.min_opt().unwrap(), &1.0);
2740 assert_eq!(stats.max_opt().unwrap(), &2.0);
2741 } else {
2742 panic!("expecting Statistics::Float");
2743 }
2744 }
2745
2746 #[test]
2747 fn test_float_statistics_nan_start() {
2748 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2749 assert!(stats.is_min_max_backwards_compatible());
2750 if let Statistics::Float(stats) = stats {
2751 assert_eq!(stats.min_opt().unwrap(), &1.0);
2752 assert_eq!(stats.max_opt().unwrap(), &2.0);
2753 } else {
2754 panic!("expecting Statistics::Float");
2755 }
2756 }
2757
2758 #[test]
2759 fn test_float_statistics_nan_only() {
2760 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2761 assert!(stats.min_bytes_opt().is_none());
2762 assert!(stats.max_bytes_opt().is_none());
2763 assert!(stats.is_min_max_backwards_compatible());
2764 assert!(matches!(stats, Statistics::Float(_)));
2765 }
2766
2767 #[test]
2768 fn test_float_statistics_zero_only() {
2769 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2770 assert!(stats.is_min_max_backwards_compatible());
2771 if let Statistics::Float(stats) = stats {
2772 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2773 assert!(stats.min_opt().unwrap().is_sign_negative());
2774 assert_eq!(stats.max_opt().unwrap(), &0.0);
2775 assert!(stats.max_opt().unwrap().is_sign_positive());
2776 } else {
2777 panic!("expecting Statistics::Float");
2778 }
2779 }
2780
2781 #[test]
2782 fn test_float_statistics_neg_zero_only() {
2783 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2784 assert!(stats.is_min_max_backwards_compatible());
2785 if let Statistics::Float(stats) = stats {
2786 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2787 assert!(stats.min_opt().unwrap().is_sign_negative());
2788 assert_eq!(stats.max_opt().unwrap(), &0.0);
2789 assert!(stats.max_opt().unwrap().is_sign_positive());
2790 } else {
2791 panic!("expecting Statistics::Float");
2792 }
2793 }
2794
2795 #[test]
2796 fn test_float_statistics_zero_min() {
2797 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2798 assert!(stats.is_min_max_backwards_compatible());
2799 if let Statistics::Float(stats) = stats {
2800 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2801 assert!(stats.min_opt().unwrap().is_sign_negative());
2802 assert_eq!(stats.max_opt().unwrap(), &2.0);
2803 } else {
2804 panic!("expecting Statistics::Float");
2805 }
2806 }
2807
2808 #[test]
2809 fn test_float_statistics_neg_zero_max() {
2810 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2811 assert!(stats.is_min_max_backwards_compatible());
2812 if let Statistics::Float(stats) = stats {
2813 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2814 assert_eq!(stats.max_opt().unwrap(), &0.0);
2815 assert!(stats.max_opt().unwrap().is_sign_positive());
2816 } else {
2817 panic!("expecting Statistics::Float");
2818 }
2819 }
2820
2821 #[test]
2822 fn test_double_statistics_nan_middle() {
2823 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2824 assert!(stats.is_min_max_backwards_compatible());
2825 if let Statistics::Double(stats) = stats {
2826 assert_eq!(stats.min_opt().unwrap(), &1.0);
2827 assert_eq!(stats.max_opt().unwrap(), &2.0);
2828 } else {
2829 panic!("expecting Statistics::Double");
2830 }
2831 }
2832
2833 #[test]
2834 fn test_double_statistics_nan_start() {
2835 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2836 assert!(stats.is_min_max_backwards_compatible());
2837 if let Statistics::Double(stats) = stats {
2838 assert_eq!(stats.min_opt().unwrap(), &1.0);
2839 assert_eq!(stats.max_opt().unwrap(), &2.0);
2840 } else {
2841 panic!("expecting Statistics::Double");
2842 }
2843 }
2844
2845 #[test]
2846 fn test_double_statistics_nan_only() {
2847 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2848 assert!(stats.min_bytes_opt().is_none());
2849 assert!(stats.max_bytes_opt().is_none());
2850 assert!(matches!(stats, Statistics::Double(_)));
2851 assert!(stats.is_min_max_backwards_compatible());
2852 }
2853
2854 #[test]
2855 fn test_double_statistics_zero_only() {
2856 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2857 assert!(stats.is_min_max_backwards_compatible());
2858 if let Statistics::Double(stats) = stats {
2859 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2860 assert!(stats.min_opt().unwrap().is_sign_negative());
2861 assert_eq!(stats.max_opt().unwrap(), &0.0);
2862 assert!(stats.max_opt().unwrap().is_sign_positive());
2863 } else {
2864 panic!("expecting Statistics::Double");
2865 }
2866 }
2867
2868 #[test]
2869 fn test_double_statistics_neg_zero_only() {
2870 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2871 assert!(stats.is_min_max_backwards_compatible());
2872 if let Statistics::Double(stats) = stats {
2873 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2874 assert!(stats.min_opt().unwrap().is_sign_negative());
2875 assert_eq!(stats.max_opt().unwrap(), &0.0);
2876 assert!(stats.max_opt().unwrap().is_sign_positive());
2877 } else {
2878 panic!("expecting Statistics::Double");
2879 }
2880 }
2881
2882 #[test]
2883 fn test_double_statistics_zero_min() {
2884 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2885 assert!(stats.is_min_max_backwards_compatible());
2886 if let Statistics::Double(stats) = stats {
2887 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2888 assert!(stats.min_opt().unwrap().is_sign_negative());
2889 assert_eq!(stats.max_opt().unwrap(), &2.0);
2890 } else {
2891 panic!("expecting Statistics::Double");
2892 }
2893 }
2894
2895 #[test]
2896 fn test_double_statistics_neg_zero_max() {
2897 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2898 assert!(stats.is_min_max_backwards_compatible());
2899 if let Statistics::Double(stats) = stats {
2900 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2901 assert_eq!(stats.max_opt().unwrap(), &0.0);
2902 assert!(stats.max_opt().unwrap().is_sign_positive());
2903 } else {
2904 panic!("expecting Statistics::Double");
2905 }
2906 }
2907
2908 #[test]
2909 fn test_compare_greater_byte_array_decimals() {
2910 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2911 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2912 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2913 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2914 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2915 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2916 assert!(!compare_greater_byte_array_decimals(
2917 &[0u8, 1u8,],
2918 &[1u8, 0u8,],
2919 ),);
2920 assert!(!compare_greater_byte_array_decimals(
2921 &[255u8, 35u8, 0u8, 0u8,],
2922 &[0u8,],
2923 ),);
2924 assert!(compare_greater_byte_array_decimals(
2925 &[0u8,],
2926 &[255u8, 35u8, 0u8, 0u8,],
2927 ),);
2928 }
2929
2930 #[test]
2931 fn test_column_index_with_null_pages() {
2932 let page_writer = get_test_page_writer();
2934 let props = Default::default();
2935 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2936 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2937
2938 let r = writer.close().unwrap();
2939 assert!(r.column_index.is_some());
2940 let col_idx = r.column_index.unwrap();
2941 assert!(col_idx.null_pages[0]);
2943 assert_eq!(col_idx.min_values[0].len(), 0);
2945 assert_eq!(col_idx.max_values[0].len(), 0);
2946 assert!(col_idx.null_counts.is_some());
2948 assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2949 assert!(col_idx.repetition_level_histograms.is_none());
2951 assert!(col_idx.definition_level_histograms.is_some());
2953 assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2954 }
2955
2956 #[test]
2957 fn test_column_offset_index_metadata() {
2958 let page_writer = get_test_page_writer();
2961 let props = Default::default();
2962 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2963 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2964 writer.flush_data_pages().unwrap();
2966 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2968
2969 let r = writer.close().unwrap();
2970 let column_index = r.column_index.unwrap();
2971 let offset_index = r.offset_index.unwrap();
2972
2973 assert_eq!(8, r.rows_written);
2974
2975 assert_eq!(2, column_index.null_pages.len());
2977 assert_eq!(2, offset_index.page_locations.len());
2978 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2979 for idx in 0..2 {
2980 assert!(!column_index.null_pages[idx]);
2981 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2982 }
2983
2984 if let Some(stats) = r.metadata.statistics() {
2985 assert_eq!(stats.null_count_opt(), Some(0));
2986 assert_eq!(stats.distinct_count_opt(), None);
2987 if let Statistics::Int32(stats) = stats {
2988 assert_eq!(
2992 stats.min_bytes_opt(),
2993 Some(column_index.min_values[1].as_slice())
2994 );
2995 assert_eq!(
2996 stats.max_bytes_opt(),
2997 column_index.max_values.get(1).map(Vec::as_slice)
2998 );
2999 } else {
3000 panic!("expecting Statistics::Int32");
3001 }
3002 } else {
3003 panic!("metadata missing statistics");
3004 }
3005
3006 assert_eq!(0, offset_index.page_locations[0].first_row_index);
3008 assert_eq!(4, offset_index.page_locations[1].first_row_index);
3009 }
3010
3011 #[test]
3013 fn test_column_offset_index_metadata_truncating() {
3014 let page_writer = get_test_page_writer();
3017 let props = WriterProperties::builder()
3018 .set_statistics_truncate_length(None) .build()
3020 .into();
3021 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3022
3023 let mut data = vec![FixedLenByteArray::default(); 3];
3024 data[0].set_data(Bytes::from(vec![97_u8; 200]));
3026 data[1].set_data(Bytes::from(vec![112_u8; 200]));
3028 data[2].set_data(Bytes::from(vec![98_u8; 200]));
3029
3030 writer.write_batch(&data, None, None).unwrap();
3031
3032 writer.flush_data_pages().unwrap();
3033
3034 let r = writer.close().unwrap();
3035 let column_index = r.column_index.unwrap();
3036 let offset_index = r.offset_index.unwrap();
3037
3038 assert_eq!(3, r.rows_written);
3039
3040 assert_eq!(1, column_index.null_pages.len());
3042 assert_eq!(1, offset_index.page_locations.len());
3043 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3044 assert!(!column_index.null_pages[0]);
3045 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3046
3047 if let Some(stats) = r.metadata.statistics() {
3048 assert_eq!(stats.null_count_opt(), Some(0));
3049 assert_eq!(stats.distinct_count_opt(), None);
3050 if let Statistics::FixedLenByteArray(stats) = stats {
3051 let column_index_min_value = &column_index.min_values[0];
3052 let column_index_max_value = &column_index.max_values[0];
3053
3054 assert_ne!(
3056 stats.min_bytes_opt(),
3057 Some(column_index_min_value.as_slice())
3058 );
3059 assert_ne!(
3060 stats.max_bytes_opt(),
3061 Some(column_index_max_value.as_slice())
3062 );
3063
3064 assert_eq!(
3065 column_index_min_value.len(),
3066 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3067 );
3068 assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
3069 assert_eq!(
3070 column_index_max_value.len(),
3071 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3072 );
3073
3074 assert_eq!(
3076 *column_index_max_value.last().unwrap(),
3077 *column_index_max_value.first().unwrap() + 1
3078 );
3079 } else {
3080 panic!("expecting Statistics::FixedLenByteArray");
3081 }
3082 } else {
3083 panic!("metadata missing statistics");
3084 }
3085 }
3086
3087 #[test]
3088 fn test_column_offset_index_truncating_spec_example() {
3089 let page_writer = get_test_page_writer();
3092
3093 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3095 let props = Arc::new(builder.build());
3096 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3097
3098 let mut data = vec![FixedLenByteArray::default(); 1];
3099 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3101
3102 writer.write_batch(&data, None, None).unwrap();
3103
3104 writer.flush_data_pages().unwrap();
3105
3106 let r = writer.close().unwrap();
3107 let column_index = r.column_index.unwrap();
3108 let offset_index = r.offset_index.unwrap();
3109
3110 assert_eq!(1, r.rows_written);
3111
3112 assert_eq!(1, column_index.null_pages.len());
3114 assert_eq!(1, offset_index.page_locations.len());
3115 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3116 assert!(!column_index.null_pages[0]);
3117 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3118
3119 if let Some(stats) = r.metadata.statistics() {
3120 assert_eq!(stats.null_count_opt(), Some(0));
3121 assert_eq!(stats.distinct_count_opt(), None);
3122 if let Statistics::FixedLenByteArray(_stats) = stats {
3123 let column_index_min_value = &column_index.min_values[0];
3124 let column_index_max_value = &column_index.max_values[0];
3125
3126 assert_eq!(column_index_min_value.len(), 1);
3127 assert_eq!(column_index_max_value.len(), 1);
3128
3129 assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
3130 assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
3131
3132 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3133 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3134 } else {
3135 panic!("expecting Statistics::FixedLenByteArray");
3136 }
3137 } else {
3138 panic!("metadata missing statistics");
3139 }
3140 }
3141
3142 #[test]
3143 fn test_float16_min_max_no_truncation() {
3144 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3146 let props = Arc::new(builder.build());
3147 let page_writer = get_test_page_writer();
3148 let mut writer = get_test_float16_column_writer(page_writer, props);
3149
3150 let expected_value = f16::PI.to_le_bytes().to_vec();
3151 let data = vec![ByteArray::from(expected_value.clone()).into()];
3152 writer.write_batch(&data, None, None).unwrap();
3153 writer.flush_data_pages().unwrap();
3154
3155 let r = writer.close().unwrap();
3156
3157 let column_index = r.column_index.unwrap();
3160 let column_index_min_bytes = column_index.min_values[0].as_slice();
3161 let column_index_max_bytes = column_index.max_values[0].as_slice();
3162 assert_eq!(expected_value, column_index_min_bytes);
3163 assert_eq!(expected_value, column_index_max_bytes);
3164
3165 let stats = r.metadata.statistics().unwrap();
3167 if let Statistics::FixedLenByteArray(stats) = stats {
3168 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3169 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3170 assert_eq!(expected_value, stats_min_bytes);
3171 assert_eq!(expected_value, stats_max_bytes);
3172 } else {
3173 panic!("expecting Statistics::FixedLenByteArray");
3174 }
3175 }
3176
3177 #[test]
3178 fn test_decimal_min_max_no_truncation() {
3179 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3181 let props = Arc::new(builder.build());
3182 let page_writer = get_test_page_writer();
3183 let mut writer =
3184 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3185
3186 let expected_value = vec![
3187 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3188 231u8, 90u8, 0u8, 0u8,
3189 ];
3190 let data = vec![ByteArray::from(expected_value.clone()).into()];
3191 writer.write_batch(&data, None, None).unwrap();
3192 writer.flush_data_pages().unwrap();
3193
3194 let r = writer.close().unwrap();
3195
3196 let column_index = r.column_index.unwrap();
3199 let column_index_min_bytes = column_index.min_values[0].as_slice();
3200 let column_index_max_bytes = column_index.max_values[0].as_slice();
3201 assert_eq!(expected_value, column_index_min_bytes);
3202 assert_eq!(expected_value, column_index_max_bytes);
3203
3204 let stats = r.metadata.statistics().unwrap();
3206 if let Statistics::FixedLenByteArray(stats) = stats {
3207 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3208 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3209 assert_eq!(expected_value, stats_min_bytes);
3210 assert_eq!(expected_value, stats_max_bytes);
3211 } else {
3212 panic!("expecting Statistics::FixedLenByteArray");
3213 }
3214 }
3215
3216 #[test]
3217 fn test_statistics_truncating_byte_array_default() {
3218 let page_writer = get_test_page_writer();
3219
3220 let props = WriterProperties::builder().build().into();
3222 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3223
3224 let mut data = vec![ByteArray::default(); 1];
3225 data[0].set_data(Bytes::from(String::from(
3226 "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3227 )));
3228 writer.write_batch(&data, None, None).unwrap();
3229 writer.flush_data_pages().unwrap();
3230
3231 let r = writer.close().unwrap();
3232
3233 assert_eq!(1, r.rows_written);
3234
3235 let stats = r.metadata.statistics().expect("statistics");
3236 if let Statistics::ByteArray(_stats) = stats {
3237 let min_value = _stats.min_opt().unwrap();
3238 let max_value = _stats.max_opt().unwrap();
3239
3240 assert!(!_stats.min_is_exact());
3241 assert!(!_stats.max_is_exact());
3242
3243 let expected_len = 64;
3244 assert_eq!(min_value.len(), expected_len);
3245 assert_eq!(max_value.len(), expected_len);
3246
3247 let expected_min =
3248 "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3249 assert_eq!(expected_min, min_value.as_bytes());
3250 let expected_max =
3252 "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3253 assert_eq!(expected_max, max_value.as_bytes());
3254 } else {
3255 panic!("expecting Statistics::ByteArray");
3256 }
3257 }
3258
3259 #[test]
3260 fn test_statistics_truncating_byte_array() {
3261 let page_writer = get_test_page_writer();
3262
3263 const TEST_TRUNCATE_LENGTH: usize = 1;
3264
3265 let builder =
3267 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3268 let props = Arc::new(builder.build());
3269 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3270
3271 let mut data = vec![ByteArray::default(); 1];
3272 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3274
3275 writer.write_batch(&data, None, None).unwrap();
3276
3277 writer.flush_data_pages().unwrap();
3278
3279 let r = writer.close().unwrap();
3280
3281 assert_eq!(1, r.rows_written);
3282
3283 let stats = r.metadata.statistics().expect("statistics");
3284 assert_eq!(stats.null_count_opt(), Some(0));
3285 assert_eq!(stats.distinct_count_opt(), None);
3286 if let Statistics::ByteArray(_stats) = stats {
3287 let min_value = _stats.min_opt().unwrap();
3288 let max_value = _stats.max_opt().unwrap();
3289
3290 assert!(!_stats.min_is_exact());
3291 assert!(!_stats.max_is_exact());
3292
3293 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3294 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3295
3296 assert_eq!("B".as_bytes(), min_value.as_bytes());
3297 assert_eq!("C".as_bytes(), max_value.as_bytes());
3298 } else {
3299 panic!("expecting Statistics::ByteArray");
3300 }
3301 }
3302
3303 #[test]
3304 fn test_statistics_truncating_fixed_len_byte_array() {
3305 let page_writer = get_test_page_writer();
3306
3307 const TEST_TRUNCATE_LENGTH: usize = 1;
3308
3309 let builder =
3311 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3312 let props = Arc::new(builder.build());
3313 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3314
3315 let mut data = vec![FixedLenByteArray::default(); 1];
3316
3317 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3318 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3319
3320 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3322 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3323
3324 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3326
3327 writer.write_batch(&data, None, None).unwrap();
3328
3329 writer.flush_data_pages().unwrap();
3330
3331 let r = writer.close().unwrap();
3332
3333 assert_eq!(1, r.rows_written);
3334
3335 let stats = r.metadata.statistics().expect("statistics");
3336 assert_eq!(stats.null_count_opt(), Some(0));
3337 assert_eq!(stats.distinct_count_opt(), None);
3338 if let Statistics::FixedLenByteArray(_stats) = stats {
3339 let min_value = _stats.min_opt().unwrap();
3340 let max_value = _stats.max_opt().unwrap();
3341
3342 assert!(!_stats.min_is_exact());
3343 assert!(!_stats.max_is_exact());
3344
3345 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3346 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3347
3348 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3349 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3350
3351 let reconstructed_min = i128::from_be_bytes([
3352 min_value.as_bytes()[0],
3353 0,
3354 0,
3355 0,
3356 0,
3357 0,
3358 0,
3359 0,
3360 0,
3361 0,
3362 0,
3363 0,
3364 0,
3365 0,
3366 0,
3367 0,
3368 ]);
3369
3370 let reconstructed_max = i128::from_be_bytes([
3371 max_value.as_bytes()[0],
3372 0,
3373 0,
3374 0,
3375 0,
3376 0,
3377 0,
3378 0,
3379 0,
3380 0,
3381 0,
3382 0,
3383 0,
3384 0,
3385 0,
3386 0,
3387 ]);
3388
3389 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3391 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3392 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3393 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3394 } else {
3395 panic!("expecting Statistics::FixedLenByteArray");
3396 }
3397 }
3398
3399 #[test]
3400 fn test_send() {
3401 fn test<T: Send>() {}
3402 test::<ColumnWriterImpl<Int32Type>>();
3403 }
3404
3405 #[test]
3406 fn test_increment() {
3407 let v = increment(vec![0, 0, 0]).unwrap();
3408 assert_eq!(&v, &[0, 0, 1]);
3409
3410 let v = increment(vec![0, 255, 255]).unwrap();
3412 assert_eq!(&v, &[1, 0, 0]);
3413
3414 let v = increment(vec![255, 255, 255]);
3416 assert!(v.is_none());
3417 }
3418
3419 #[test]
3420 fn test_increment_utf8() {
3421 let test_inc = |o: &str, expected: &str| {
3422 if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3423 assert_eq!(v, expected);
3425 assert!(*v > *o);
3427 let mut greater = ByteArray::new();
3429 greater.set_data(Bytes::from(v));
3430 let mut original = ByteArray::new();
3431 original.set_data(Bytes::from(o.as_bytes().to_vec()));
3432 assert!(greater > original);
3433 } else {
3434 panic!("Expected incremented UTF8 string to also be valid.");
3435 }
3436 };
3437
3438 test_inc("hello", "hellp");
3440
3441 test_inc("a\u{7f}", "b");
3443
3444 assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3446
3447 test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3449
3450 test_inc("éééé", "éééê");
3452
3453 test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3455
3456 test_inc("a\u{7ff}", "b");
3458
3459 assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3461
3462 test_inc("ࠀࠀ", "ࠀࠁ");
3465
3466 test_inc("a\u{ffff}", "b");
3468
3469 assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3471
3472 test_inc("𐀀𐀀", "𐀀𐀁");
3474
3475 test_inc("a\u{10ffff}", "b");
3477
3478 assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3480
3481 test_inc("a\u{D7FF}", "b");
3484 }
3485
3486 #[test]
3487 fn test_truncate_utf8() {
3488 let data = "❤️🧡💛💚💙💜";
3490 let r = truncate_utf8(data, data.len()).unwrap();
3491 assert_eq!(r.len(), data.len());
3492 assert_eq!(&r, data.as_bytes());
3493
3494 let r = truncate_utf8(data, 13).unwrap();
3496 assert_eq!(r.len(), 10);
3497 assert_eq!(&r, "❤️🧡".as_bytes());
3498
3499 let r = truncate_utf8("\u{0836}", 1);
3501 assert!(r.is_none());
3502
3503 let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3506 assert_eq!(&r, "yyyyyyyz".as_bytes());
3507
3508 let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3510 assert_eq!(&r, "ééê".as_bytes());
3511
3512 let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3514 assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3515
3516 let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3518 assert!(r.is_none());
3519
3520 let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3523 assert_eq!(&r, "ࠀࠁ".as_bytes());
3524
3525 let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3527 assert!(r.is_none());
3528
3529 let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3531 assert_eq!(&r, "𐀀𐀁".as_bytes());
3532
3533 let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3535 assert!(r.is_none());
3536 }
3537
3538 #[test]
3539 fn test_byte_array_truncate_invalid_utf8_statistics() {
3542 let message_type = "
3543 message test_schema {
3544 OPTIONAL BYTE_ARRAY a (UTF8);
3545 }
3546 ";
3547 let schema = Arc::new(parse_message_type(message_type).unwrap());
3548
3549 let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3551 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3552 let file: File = tempfile::tempfile().unwrap();
3553 let props = Arc::new(
3554 WriterProperties::builder()
3555 .set_statistics_enabled(EnabledStatistics::Chunk)
3556 .set_statistics_truncate_length(Some(8))
3557 .build(),
3558 );
3559
3560 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3561 let mut row_group_writer = writer.next_row_group().unwrap();
3562
3563 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3564 col_writer
3565 .typed::<ByteArrayType>()
3566 .write_batch(&data, Some(&def_levels), None)
3567 .unwrap();
3568 col_writer.close().unwrap();
3569 row_group_writer.close().unwrap();
3570 let file_metadata = writer.close().unwrap();
3571 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
3572 let stats = file_metadata.row_groups[0].columns[0]
3573 .meta_data
3574 .as_ref()
3575 .unwrap()
3576 .statistics
3577 .as_ref()
3578 .unwrap();
3579 assert!(!stats.is_max_value_exact.unwrap());
3580 assert_eq!(
3583 stats.max_value,
3584 Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3585 );
3586 }
3587
3588 #[test]
3589 fn test_increment_max_binary_chars() {
3590 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3591 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3592
3593 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3594 assert!(incremented.is_none())
3595 }
3596
3597 #[test]
3598 fn test_no_column_index_when_stats_disabled() {
3599 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3603 let props = Arc::new(
3604 WriterProperties::builder()
3605 .set_statistics_enabled(EnabledStatistics::None)
3606 .build(),
3607 );
3608 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3609 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3610
3611 let data = Vec::new();
3612 let def_levels = vec![0; 10];
3613 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3614 writer.flush_data_pages().unwrap();
3615
3616 let column_close_result = writer.close().unwrap();
3617 assert!(column_close_result.offset_index.is_some());
3618 assert!(column_close_result.column_index.is_none());
3619 }
3620
3621 #[test]
3622 fn test_no_offset_index_when_disabled() {
3623 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3625 let props = Arc::new(
3626 WriterProperties::builder()
3627 .set_statistics_enabled(EnabledStatistics::None)
3628 .set_offset_index_disabled(true)
3629 .build(),
3630 );
3631 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3632 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3633
3634 let data = Vec::new();
3635 let def_levels = vec![0; 10];
3636 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3637 writer.flush_data_pages().unwrap();
3638
3639 let column_close_result = writer.close().unwrap();
3640 assert!(column_close_result.offset_index.is_none());
3641 assert!(column_close_result.column_index.is_none());
3642 }
3643
3644 #[test]
3645 fn test_offset_index_overridden() {
3646 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3648 let props = Arc::new(
3649 WriterProperties::builder()
3650 .set_statistics_enabled(EnabledStatistics::Page)
3651 .set_offset_index_disabled(true)
3652 .build(),
3653 );
3654 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3655 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3656
3657 let data = Vec::new();
3658 let def_levels = vec![0; 10];
3659 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3660 writer.flush_data_pages().unwrap();
3661
3662 let column_close_result = writer.close().unwrap();
3663 assert!(column_close_result.offset_index.is_some());
3664 assert!(column_close_result.column_index.is_some());
3665 }
3666
3667 #[test]
3668 fn test_boundary_order() -> Result<()> {
3669 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3670 let column_close_result = write_multiple_pages::<Int32Type>(
3672 &descr,
3673 &[
3674 &[Some(-10), Some(10)],
3675 &[Some(-5), Some(11)],
3676 &[None],
3677 &[Some(-5), Some(11)],
3678 ],
3679 )?;
3680 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3681 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3682
3683 let column_close_result = write_multiple_pages::<Int32Type>(
3685 &descr,
3686 &[
3687 &[Some(10), Some(11)],
3688 &[Some(5), Some(11)],
3689 &[None],
3690 &[Some(-5), Some(0)],
3691 ],
3692 )?;
3693 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3694 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3695
3696 let column_close_result = write_multiple_pages::<Int32Type>(
3698 &descr,
3699 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3700 )?;
3701 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3702 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3703
3704 let column_close_result =
3706 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3707 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3708 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3709
3710 let column_close_result =
3712 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3713 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3714 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3715
3716 let column_close_result =
3718 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3719 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3720 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3721
3722 let column_close_result = write_multiple_pages::<Int32Type>(
3724 &descr,
3725 &[
3726 &[Some(10), Some(11)],
3727 &[Some(11), Some(16)],
3728 &[None],
3729 &[Some(-5), Some(0)],
3730 ],
3731 )?;
3732 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3733 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3734
3735 let column_close_result = write_multiple_pages::<Int32Type>(
3737 &descr,
3738 &[
3739 &[Some(1), Some(9)],
3740 &[Some(2), Some(8)],
3741 &[None],
3742 &[Some(3), Some(7)],
3743 ],
3744 )?;
3745 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3746 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3747
3748 Ok(())
3749 }
3750
3751 #[test]
3752 fn test_boundary_order_logical_type() -> Result<()> {
3753 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3756 let fba_descr = {
3757 let tpe = SchemaType::primitive_type_builder(
3758 "col",
3759 FixedLenByteArrayType::get_physical_type(),
3760 )
3761 .with_length(2)
3762 .build()?;
3763 Arc::new(ColumnDescriptor::new(
3764 Arc::new(tpe),
3765 1,
3766 0,
3767 ColumnPath::from("col"),
3768 ))
3769 };
3770
3771 let values: &[&[Option<FixedLenByteArray>]] = &[
3772 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3773 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3774 &[Some(FixedLenByteArray::from(ByteArray::from(
3775 f16::NEG_ZERO,
3776 )))],
3777 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3778 ];
3779
3780 let column_close_result =
3782 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3783 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3784 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3785
3786 let column_close_result =
3788 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3789 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3790 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3791
3792 Ok(())
3793 }
3794
3795 #[test]
3796 fn test_interval_stats_should_not_have_min_max() {
3797 let input = [
3798 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3799 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3800 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3801 ]
3802 .into_iter()
3803 .map(|s| ByteArray::from(s).into())
3804 .collect::<Vec<_>>();
3805
3806 let page_writer = get_test_page_writer();
3807 let mut writer = get_test_interval_column_writer(page_writer);
3808 writer.write_batch(&input, None, None).unwrap();
3809
3810 let metadata = writer.close().unwrap().metadata;
3811 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3812 stats.clone()
3813 } else {
3814 panic!("metadata missing statistics");
3815 };
3816 assert!(stats.min_bytes_opt().is_none());
3817 assert!(stats.max_bytes_opt().is_none());
3818 }
3819
3820 #[test]
3821 #[cfg(feature = "arrow")]
3822 fn test_column_writer_get_estimated_total_bytes() {
3823 let page_writer = get_test_page_writer();
3824 let props = Default::default();
3825 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3826 assert_eq!(writer.get_estimated_total_bytes(), 0);
3827
3828 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3829 writer.add_data_page().unwrap();
3830 let size_with_one_page = writer.get_estimated_total_bytes();
3831 assert_eq!(size_with_one_page, 20);
3832
3833 writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3834 writer.add_data_page().unwrap();
3835 let size_with_two_pages = writer.get_estimated_total_bytes();
3836 assert_eq!(size_with_two_pages, 20 + 21);
3838 }
3839
3840 fn write_multiple_pages<T: DataType>(
3841 column_descr: &Arc<ColumnDescriptor>,
3842 pages: &[&[Option<T::T>]],
3843 ) -> Result<ColumnCloseResult> {
3844 let column_writer = get_column_writer(
3845 column_descr.clone(),
3846 Default::default(),
3847 get_test_page_writer(),
3848 );
3849 let mut writer = get_typed_column_writer::<T>(column_writer);
3850
3851 for &page in pages {
3852 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3853 let def_levels = page
3854 .iter()
3855 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3856 .collect::<Vec<_>>();
3857 writer.write_batch(&values, Some(&def_levels), None)?;
3858 writer.flush_data_pages()?;
3859 }
3860
3861 writer.close()
3862 }
3863
3864 fn column_roundtrip_random<T: DataType>(
3868 props: WriterProperties,
3869 max_size: usize,
3870 min_value: T::T,
3871 max_value: T::T,
3872 max_def_level: i16,
3873 max_rep_level: i16,
3874 ) where
3875 T::T: PartialOrd + SampleUniform + Copy,
3876 {
3877 let mut num_values: usize = 0;
3878
3879 let mut buf: Vec<i16> = Vec::new();
3880 let def_levels = if max_def_level > 0 {
3881 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3882 for &dl in &buf[..] {
3883 if dl == max_def_level {
3884 num_values += 1;
3885 }
3886 }
3887 Some(&buf[..])
3888 } else {
3889 num_values = max_size;
3890 None
3891 };
3892
3893 let mut buf: Vec<i16> = Vec::new();
3894 let rep_levels = if max_rep_level > 0 {
3895 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3896 buf[0] = 0; Some(&buf[..])
3898 } else {
3899 None
3900 };
3901
3902 let mut values: Vec<T::T> = Vec::new();
3903 random_numbers_range(num_values, min_value, max_value, &mut values);
3904
3905 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3906 }
3907
3908 fn column_roundtrip<T: DataType>(
3910 props: WriterProperties,
3911 values: &[T::T],
3912 def_levels: Option<&[i16]>,
3913 rep_levels: Option<&[i16]>,
3914 ) {
3915 let mut file = tempfile::tempfile().unwrap();
3916 let mut write = TrackedWrite::new(&mut file);
3917 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3918
3919 let max_def_level = match def_levels {
3920 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3921 None => 0i16,
3922 };
3923
3924 let max_rep_level = match rep_levels {
3925 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3926 None => 0i16,
3927 };
3928
3929 let mut max_batch_size = values.len();
3930 if let Some(levels) = def_levels {
3931 max_batch_size = max_batch_size.max(levels.len());
3932 }
3933 if let Some(levels) = rep_levels {
3934 max_batch_size = max_batch_size.max(levels.len());
3935 }
3936
3937 let mut writer =
3938 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3939
3940 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3941 assert_eq!(values_written, values.len());
3942 let result = writer.close().unwrap();
3943
3944 drop(write);
3945
3946 let props = ReaderProperties::builder()
3947 .set_backward_compatible_lz4(false)
3948 .build();
3949 let page_reader = Box::new(
3950 SerializedPageReader::new_with_properties(
3951 Arc::new(file),
3952 &result.metadata,
3953 result.rows_written as usize,
3954 None,
3955 Arc::new(props),
3956 )
3957 .unwrap(),
3958 );
3959 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3960
3961 let mut actual_values = Vec::with_capacity(max_batch_size);
3962 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3963 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3964
3965 let (_, values_read, levels_read) = reader
3966 .read_records(
3967 max_batch_size,
3968 actual_def_levels.as_mut(),
3969 actual_rep_levels.as_mut(),
3970 &mut actual_values,
3971 )
3972 .unwrap();
3973
3974 assert_eq!(&actual_values[..values_read], values);
3977 match actual_def_levels {
3978 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3979 None => assert_eq!(None, def_levels),
3980 }
3981 match actual_rep_levels {
3982 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3983 None => assert_eq!(None, rep_levels),
3984 }
3985
3986 if let Some(levels) = actual_rep_levels {
3989 let mut actual_rows_written = 0;
3990 for l in levels {
3991 if l == 0 {
3992 actual_rows_written += 1;
3993 }
3994 }
3995 assert_eq!(actual_rows_written, result.rows_written);
3996 } else if actual_def_levels.is_some() {
3997 assert_eq!(levels_read as u64, result.rows_written);
3998 } else {
3999 assert_eq!(values_read as u64, result.rows_written);
4000 }
4001 }
4002
4003 fn column_write_and_get_metadata<T: DataType>(
4006 props: WriterProperties,
4007 values: &[T::T],
4008 ) -> ColumnChunkMetaData {
4009 let page_writer = get_test_page_writer();
4010 let props = Arc::new(props);
4011 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4012 writer.write_batch(values, None, None).unwrap();
4013 writer.close().unwrap().metadata
4014 }
4015
4016 fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4018 PageEncodingStats {
4019 page_type,
4020 encoding,
4021 count,
4022 }
4023 }
4024
4025 fn check_encoding_write_support<T: DataType>(
4029 version: WriterVersion,
4030 dict_enabled: bool,
4031 data: &[T::T],
4032 dictionary_page_offset: Option<i64>,
4033 encodings: &[Encoding],
4034 page_encoding_stats: &[PageEncodingStats],
4035 ) {
4036 let props = WriterProperties::builder()
4037 .set_writer_version(version)
4038 .set_dictionary_enabled(dict_enabled)
4039 .build();
4040 let meta = column_write_and_get_metadata::<T>(props, data);
4041 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4042 assert_eq!(meta.encodings(), encodings);
4043 assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4044 }
4045
4046 fn get_test_column_writer<'a, T: DataType>(
4048 page_writer: Box<dyn PageWriter + 'a>,
4049 max_def_level: i16,
4050 max_rep_level: i16,
4051 props: WriterPropertiesPtr,
4052 ) -> ColumnWriterImpl<'a, T> {
4053 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4054 let column_writer = get_column_writer(descr, props, page_writer);
4055 get_typed_column_writer::<T>(column_writer)
4056 }
4057
4058 fn get_test_column_reader<T: DataType>(
4060 page_reader: Box<dyn PageReader>,
4061 max_def_level: i16,
4062 max_rep_level: i16,
4063 ) -> ColumnReaderImpl<T> {
4064 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4065 let column_reader = get_column_reader(descr, page_reader);
4066 get_typed_column_reader::<T>(column_reader)
4067 }
4068
4069 fn get_test_column_descr<T: DataType>(
4071 max_def_level: i16,
4072 max_rep_level: i16,
4073 ) -> ColumnDescriptor {
4074 let path = ColumnPath::from("col");
4075 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4076 .with_length(1)
4079 .build()
4080 .unwrap();
4081 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4082 }
4083
4084 fn get_test_page_writer() -> Box<dyn PageWriter> {
4086 Box::new(TestPageWriter {})
4087 }
4088
4089 struct TestPageWriter {}
4090
4091 impl PageWriter for TestPageWriter {
4092 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4093 let mut res = PageWriteSpec::new();
4094 res.page_type = page.page_type();
4095 res.uncompressed_size = page.uncompressed_size();
4096 res.compressed_size = page.compressed_size();
4097 res.num_values = page.num_values();
4098 res.offset = 0;
4099 res.bytes_written = page.data().len() as u64;
4100 Ok(res)
4101 }
4102
4103 fn close(&mut self) -> Result<()> {
4104 Ok(())
4105 }
4106 }
4107
4108 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4110 let page_writer = get_test_page_writer();
4111 let props = Default::default();
4112 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4113 writer.write_batch(values, None, None).unwrap();
4114
4115 let metadata = writer.close().unwrap().metadata;
4116 if let Some(stats) = metadata.statistics() {
4117 stats.clone()
4118 } else {
4119 panic!("metadata missing statistics");
4120 }
4121 }
4122
4123 fn get_test_decimals_column_writer<T: DataType>(
4125 page_writer: Box<dyn PageWriter>,
4126 max_def_level: i16,
4127 max_rep_level: i16,
4128 props: WriterPropertiesPtr,
4129 ) -> ColumnWriterImpl<'static, T> {
4130 let descr = Arc::new(get_test_decimals_column_descr::<T>(
4131 max_def_level,
4132 max_rep_level,
4133 ));
4134 let column_writer = get_column_writer(descr, props, page_writer);
4135 get_typed_column_writer::<T>(column_writer)
4136 }
4137
4138 fn get_test_decimals_column_descr<T: DataType>(
4140 max_def_level: i16,
4141 max_rep_level: i16,
4142 ) -> ColumnDescriptor {
4143 let path = ColumnPath::from("col");
4144 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4145 .with_length(16)
4146 .with_logical_type(Some(LogicalType::Decimal {
4147 scale: 2,
4148 precision: 3,
4149 }))
4150 .with_scale(2)
4151 .with_precision(3)
4152 .build()
4153 .unwrap();
4154 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4155 }
4156
4157 fn float16_statistics_roundtrip(
4158 values: &[FixedLenByteArray],
4159 ) -> ValueStatistics<FixedLenByteArray> {
4160 let page_writer = get_test_page_writer();
4161 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4162 writer.write_batch(values, None, None).unwrap();
4163
4164 let metadata = writer.close().unwrap().metadata;
4165 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4166 stats.clone()
4167 } else {
4168 panic!("metadata missing statistics");
4169 }
4170 }
4171
4172 fn get_test_float16_column_writer(
4173 page_writer: Box<dyn PageWriter>,
4174 props: WriterPropertiesPtr,
4175 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4176 let descr = Arc::new(get_test_float16_column_descr(0, 0));
4177 let column_writer = get_column_writer(descr, props, page_writer);
4178 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4179 }
4180
4181 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4182 let path = ColumnPath::from("col");
4183 let tpe =
4184 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4185 .with_length(2)
4186 .with_logical_type(Some(LogicalType::Float16))
4187 .build()
4188 .unwrap();
4189 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4190 }
4191
4192 fn get_test_interval_column_writer(
4193 page_writer: Box<dyn PageWriter>,
4194 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4195 let descr = Arc::new(get_test_interval_column_descr());
4196 let column_writer = get_column_writer(descr, Default::default(), page_writer);
4197 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4198 }
4199
4200 fn get_test_interval_column_descr() -> ColumnDescriptor {
4201 let path = ColumnPath::from("col");
4202 let tpe =
4203 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4204 .with_length(12)
4205 .with_converted_type(ConvertedType::INTERVAL)
4206 .build()
4207 .unwrap();
4208 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4209 }
4210
4211 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4213 page_writer: Box<dyn PageWriter + 'a>,
4214 max_def_level: i16,
4215 max_rep_level: i16,
4216 props: WriterPropertiesPtr,
4217 ) -> ColumnWriterImpl<'a, T> {
4218 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4219 max_def_level,
4220 max_rep_level,
4221 ));
4222 let column_writer = get_column_writer(descr, props, page_writer);
4223 get_typed_column_writer::<T>(column_writer)
4224 }
4225
4226 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4228 max_def_level: i16,
4229 max_rep_level: i16,
4230 ) -> ColumnDescriptor {
4231 let path = ColumnPath::from("col");
4232 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4233 .with_converted_type(ConvertedType::UINT_32)
4234 .build()
4235 .unwrap();
4236 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4237 }
4238}