1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35#[cfg(feature = "encryption")]
36use crate::encryption::encrypt::get_column_crypto_metadata;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{
39 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
40 OffsetIndexBuilder,
41};
42use crate::file::page_encoding_stats::PageEncodingStats;
43use crate::file::properties::{
44 EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
45};
46use crate::file::statistics::{Statistics, ValueStatistics};
47use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
48
49pub(crate) mod encoder;
50
51macro_rules! downcast_writer {
52 ($e:expr, $i:ident, $b:expr) => {
53 match $e {
54 Self::BoolColumnWriter($i) => $b,
55 Self::Int32ColumnWriter($i) => $b,
56 Self::Int64ColumnWriter($i) => $b,
57 Self::Int96ColumnWriter($i) => $b,
58 Self::FloatColumnWriter($i) => $b,
59 Self::DoubleColumnWriter($i) => $b,
60 Self::ByteArrayColumnWriter($i) => $b,
61 Self::FixedLenByteArrayColumnWriter($i) => $b,
62 }
63 };
64}
65
66pub enum ColumnWriter<'a> {
68 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
70 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
72 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
74 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
76 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
78 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
80 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
82 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
84}
85
86impl ColumnWriter<'_> {
87 #[cfg(feature = "arrow")]
89 pub(crate) fn memory_size(&self) -> usize {
90 downcast_writer!(self, typed, typed.memory_size())
91 }
92
93 #[cfg(feature = "arrow")]
95 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
96 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
97 }
98
99 pub fn close(self) -> Result<ColumnCloseResult> {
101 downcast_writer!(self, typed, typed.close())
102 }
103}
104
105pub fn get_column_writer<'a>(
107 descr: ColumnDescPtr,
108 props: WriterPropertiesPtr,
109 page_writer: Box<dyn PageWriter + 'a>,
110) -> ColumnWriter<'a> {
111 match descr.physical_type() {
112 Type::BOOLEAN => {
113 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
114 }
115 Type::INT32 => {
116 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
117 }
118 Type::INT64 => {
119 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
120 }
121 Type::INT96 => {
122 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
123 }
124 Type::FLOAT => {
125 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
126 }
127 Type::DOUBLE => {
128 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
129 }
130 Type::BYTE_ARRAY => {
131 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
132 }
133 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
134 ColumnWriterImpl::new(descr, props, page_writer),
135 ),
136 }
137}
138
139pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
144 T::get_column_writer(col_writer).unwrap_or_else(|| {
145 panic!(
146 "Failed to convert column writer into a typed column writer for `{}` type",
147 T::get_physical_type()
148 )
149 })
150}
151
152pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
154 col_writer: &'b ColumnWriter<'a>,
155) -> &'b ColumnWriterImpl<'a, T> {
156 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
157 panic!(
158 "Failed to convert column writer into a typed column writer for `{}` type",
159 T::get_physical_type()
160 )
161 })
162}
163
164pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
166 col_writer: &'a mut ColumnWriter<'b>,
167) -> &'a mut ColumnWriterImpl<'b, T> {
168 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
169 panic!(
170 "Failed to convert column writer into a typed column writer for `{}` type",
171 T::get_physical_type()
172 )
173 })
174}
175
176#[derive(Debug, Clone)]
178pub struct ColumnCloseResult {
179 pub bytes_written: u64,
181 pub rows_written: u64,
183 pub metadata: ColumnChunkMetaData,
185 pub bloom_filter: Option<Sbbf>,
187 pub column_index: Option<ColumnIndex>,
189 pub offset_index: Option<OffsetIndex>,
191}
192
193#[derive(Default)]
195struct PageMetrics {
196 num_buffered_values: u32,
197 num_buffered_rows: u32,
198 num_page_nulls: u64,
199 repetition_level_histogram: Option<LevelHistogram>,
200 definition_level_histogram: Option<LevelHistogram>,
201}
202
203impl PageMetrics {
204 fn new() -> Self {
205 Default::default()
206 }
207
208 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
210 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
211 self
212 }
213
214 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
216 self.definition_level_histogram = LevelHistogram::try_new(max_level);
217 self
218 }
219
220 fn new_page(&mut self) {
223 self.num_buffered_values = 0;
224 self.num_buffered_rows = 0;
225 self.num_page_nulls = 0;
226 self.repetition_level_histogram
227 .as_mut()
228 .map(LevelHistogram::reset);
229 self.definition_level_histogram
230 .as_mut()
231 .map(LevelHistogram::reset);
232 }
233
234 fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
236 if let Some(ref mut rep_hist) = self.repetition_level_histogram {
237 rep_hist.update_from_levels(levels);
238 }
239 }
240
241 fn update_definition_level_histogram(&mut self, levels: &[i16]) {
243 if let Some(ref mut def_hist) = self.definition_level_histogram {
244 def_hist.update_from_levels(levels);
245 }
246 }
247}
248
249#[derive(Default)]
251struct ColumnMetrics<T: Default> {
252 total_bytes_written: u64,
253 total_rows_written: u64,
254 total_uncompressed_size: u64,
255 total_compressed_size: u64,
256 total_num_values: u64,
257 dictionary_page_offset: Option<u64>,
258 data_page_offset: Option<u64>,
259 min_column_value: Option<T>,
260 max_column_value: Option<T>,
261 num_column_nulls: u64,
262 column_distinct_count: Option<u64>,
263 variable_length_bytes: Option<i64>,
264 repetition_level_histogram: Option<LevelHistogram>,
265 definition_level_histogram: Option<LevelHistogram>,
266}
267
268impl<T: Default> ColumnMetrics<T> {
269 fn new() -> Self {
270 Default::default()
271 }
272
273 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
275 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
276 self
277 }
278
279 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
281 self.definition_level_histogram = LevelHistogram::try_new(max_level);
282 self
283 }
284
285 fn update_histogram(
287 chunk_histogram: &mut Option<LevelHistogram>,
288 page_histogram: &Option<LevelHistogram>,
289 ) {
290 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
291 chunk_hist.add(page_hist);
292 }
293 }
294
295 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
298 ColumnMetrics::<T>::update_histogram(
299 &mut self.definition_level_histogram,
300 &page_metrics.definition_level_histogram,
301 );
302 ColumnMetrics::<T>::update_histogram(
303 &mut self.repetition_level_histogram,
304 &page_metrics.repetition_level_histogram,
305 );
306 }
307
308 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
310 if let Some(var_bytes) = variable_length_bytes {
311 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
312 }
313 }
314}
315
316pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
318
319pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
321 descr: ColumnDescPtr,
323 props: WriterPropertiesPtr,
324 statistics_enabled: EnabledStatistics,
325
326 page_writer: Box<dyn PageWriter + 'a>,
327 codec: Compression,
328 compressor: Option<Box<dyn Codec>>,
329 encoder: E,
330
331 page_metrics: PageMetrics,
332 column_metrics: ColumnMetrics<E::T>,
334
335 encodings: BTreeSet<Encoding>,
338 encoding_stats: Vec<PageEncodingStats>,
339 def_levels_sink: Vec<i16>,
341 rep_levels_sink: Vec<i16>,
342 data_pages: VecDeque<CompressedPage>,
343 column_index_builder: ColumnIndexBuilder,
345 offset_index_builder: Option<OffsetIndexBuilder>,
346
347 data_page_boundary_ascending: bool,
350 data_page_boundary_descending: bool,
351 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
353}
354
355impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
356 pub fn new(
358 descr: ColumnDescPtr,
359 props: WriterPropertiesPtr,
360 page_writer: Box<dyn PageWriter + 'a>,
361 ) -> Self {
362 let codec = props.compression(descr.path());
363 let codec_options = CodecOptionsBuilder::default().build();
364 let compressor = create_codec(codec, &codec_options).unwrap();
365 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
366
367 let statistics_enabled = props.statistics_enabled(descr.path());
368
369 let mut encodings = BTreeSet::new();
370 encodings.insert(Encoding::RLE);
372
373 let mut page_metrics = PageMetrics::new();
374 let mut column_metrics = ColumnMetrics::<E::T>::new();
375
376 if statistics_enabled != EnabledStatistics::None {
378 page_metrics = page_metrics
379 .with_repetition_level_histogram(descr.max_rep_level())
380 .with_definition_level_histogram(descr.max_def_level());
381 column_metrics = column_metrics
382 .with_repetition_level_histogram(descr.max_rep_level())
383 .with_definition_level_histogram(descr.max_def_level())
384 }
385
386 let mut column_index_builder = ColumnIndexBuilder::new();
388 if statistics_enabled != EnabledStatistics::Page {
389 column_index_builder.to_invalid()
390 }
391
392 let offset_index_builder = match props.offset_index_disabled() {
394 false => Some(OffsetIndexBuilder::new()),
395 _ => None,
396 };
397
398 Self {
399 descr,
400 props,
401 statistics_enabled,
402 page_writer,
403 codec,
404 compressor,
405 encoder,
406 def_levels_sink: vec![],
407 rep_levels_sink: vec![],
408 data_pages: VecDeque::new(),
409 page_metrics,
410 column_metrics,
411 column_index_builder,
412 offset_index_builder,
413 encodings,
414 encoding_stats: vec![],
415 data_page_boundary_ascending: true,
416 data_page_boundary_descending: true,
417 last_non_null_data_page_min_max: None,
418 }
419 }
420
421 #[allow(clippy::too_many_arguments)]
422 pub(crate) fn write_batch_internal(
423 &mut self,
424 values: &E::Values,
425 value_indices: Option<&[usize]>,
426 def_levels: Option<&[i16]>,
427 rep_levels: Option<&[i16]>,
428 min: Option<&E::T>,
429 max: Option<&E::T>,
430 distinct_count: Option<u64>,
431 ) -> Result<usize> {
432 if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
434 if def.len() != rep.len() {
435 return Err(general_err!(
436 "Inconsistent length of definition and repetition levels: {} != {}",
437 def.len(),
438 rep.len()
439 ));
440 }
441 }
442
443 let num_levels = match def_levels {
454 Some(def_levels) => def_levels.len(),
455 None => values.len(),
456 };
457
458 if let Some(min) = min {
459 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
460 }
461 if let Some(max) = max {
462 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
463 }
464
465 if self.encoder.num_values() == 0 {
467 self.column_metrics.column_distinct_count = distinct_count;
468 } else {
469 self.column_metrics.column_distinct_count = None;
470 }
471
472 let mut values_offset = 0;
473 let mut levels_offset = 0;
474 let base_batch_size = self.props.write_batch_size();
475 while levels_offset < num_levels {
476 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
477
478 if let Some(r) = rep_levels {
480 while end_offset < r.len() && r[end_offset] != 0 {
481 end_offset += 1;
482 }
483 }
484
485 values_offset += self.write_mini_batch(
486 values,
487 values_offset,
488 value_indices,
489 end_offset - levels_offset,
490 def_levels.map(|lv| &lv[levels_offset..end_offset]),
491 rep_levels.map(|lv| &lv[levels_offset..end_offset]),
492 )?;
493 levels_offset = end_offset;
494 }
495
496 Ok(values_offset)
498 }
499
500 pub fn write_batch(
513 &mut self,
514 values: &E::Values,
515 def_levels: Option<&[i16]>,
516 rep_levels: Option<&[i16]>,
517 ) -> Result<usize> {
518 self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
519 }
520
521 pub fn write_batch_with_statistics(
529 &mut self,
530 values: &E::Values,
531 def_levels: Option<&[i16]>,
532 rep_levels: Option<&[i16]>,
533 min: Option<&E::T>,
534 max: Option<&E::T>,
535 distinct_count: Option<u64>,
536 ) -> Result<usize> {
537 self.write_batch_internal(
538 values,
539 None,
540 def_levels,
541 rep_levels,
542 min,
543 max,
544 distinct_count,
545 )
546 }
547
548 #[cfg(feature = "arrow")]
553 pub(crate) fn memory_size(&self) -> usize {
554 self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
555 }
556
557 pub fn get_total_bytes_written(&self) -> u64 {
563 self.column_metrics.total_bytes_written
564 }
565
566 #[cfg(feature = "arrow")]
572 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
573 self.data_pages
574 .iter()
575 .map(|page| page.data().len() as u64)
576 .sum::<u64>()
577 + self.column_metrics.total_bytes_written
578 + self.encoder.estimated_data_page_size() as u64
579 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
580 }
581
582 pub fn get_total_rows_written(&self) -> u64 {
585 self.column_metrics.total_rows_written
586 }
587
588 pub fn get_descriptor(&self) -> &ColumnDescPtr {
590 &self.descr
591 }
592
593 pub fn close(mut self) -> Result<ColumnCloseResult> {
596 if self.page_metrics.num_buffered_values > 0 {
597 self.add_data_page()?;
598 }
599 if self.encoder.has_dictionary() {
600 self.write_dictionary_page()?;
601 }
602 self.flush_data_pages()?;
603 let metadata = self.build_column_metadata()?;
604 self.page_writer.close()?;
605
606 let boundary_order = match (
607 self.data_page_boundary_ascending,
608 self.data_page_boundary_descending,
609 ) {
610 (true, _) => BoundaryOrder::ASCENDING,
613 (false, true) => BoundaryOrder::DESCENDING,
614 (false, false) => BoundaryOrder::UNORDERED,
615 };
616 self.column_index_builder.set_boundary_order(boundary_order);
617
618 let column_index = self
619 .column_index_builder
620 .valid()
621 .then(|| self.column_index_builder.build_to_thrift());
622
623 let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());
624
625 Ok(ColumnCloseResult {
626 bytes_written: self.column_metrics.total_bytes_written,
627 rows_written: self.column_metrics.total_rows_written,
628 bloom_filter: self.encoder.flush_bloom_filter(),
629 metadata,
630 column_index,
631 offset_index,
632 })
633 }
634
635 fn write_mini_batch(
639 &mut self,
640 values: &E::Values,
641 values_offset: usize,
642 value_indices: Option<&[usize]>,
643 num_levels: usize,
644 def_levels: Option<&[i16]>,
645 rep_levels: Option<&[i16]>,
646 ) -> Result<usize> {
647 let values_to_write = if self.descr.max_def_level() > 0 {
649 let levels = def_levels.ok_or_else(|| {
650 general_err!(
651 "Definition levels are required, because max definition level = {}",
652 self.descr.max_def_level()
653 )
654 })?;
655
656 let values_to_write = levels
657 .iter()
658 .map(|level| (*level == self.descr.max_def_level()) as usize)
659 .sum();
660 self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
661
662 self.page_metrics.update_definition_level_histogram(levels);
664
665 self.def_levels_sink.extend_from_slice(levels);
666 values_to_write
667 } else {
668 num_levels
669 };
670
671 if self.descr.max_rep_level() > 0 {
673 let levels = rep_levels.ok_or_else(|| {
675 general_err!(
676 "Repetition levels are required, because max repetition level = {}",
677 self.descr.max_rep_level()
678 )
679 })?;
680
681 if !levels.is_empty() && levels[0] != 0 {
682 return Err(general_err!(
683 "Write must start at a record boundary, got non-zero repetition level of {}",
684 levels[0]
685 ));
686 }
687
688 for &level in levels {
690 self.page_metrics.num_buffered_rows += (level == 0) as u32
691 }
692
693 self.page_metrics.update_repetition_level_histogram(levels);
695
696 self.rep_levels_sink.extend_from_slice(levels);
697 } else {
698 self.page_metrics.num_buffered_rows += num_levels as u32;
701 }
702
703 match value_indices {
704 Some(indices) => {
705 let indices = &indices[values_offset..values_offset + values_to_write];
706 self.encoder.write_gather(values, indices)?;
707 }
708 None => self.encoder.write(values, values_offset, values_to_write)?,
709 }
710
711 self.page_metrics.num_buffered_values += num_levels as u32;
712
713 if self.should_add_data_page() {
714 self.add_data_page()?;
715 }
716
717 if self.should_dict_fallback() {
718 self.dict_fallback()?;
719 }
720
721 Ok(values_to_write)
722 }
723
724 #[inline]
729 fn should_dict_fallback(&self) -> bool {
730 match self.encoder.estimated_dict_page_size() {
731 Some(size) => {
732 size >= self
733 .props
734 .column_dictionary_page_size_limit(self.descr.path())
735 }
736 None => false,
737 }
738 }
739
740 #[inline]
742 fn should_add_data_page(&self) -> bool {
743 if self.page_metrics.num_buffered_values == 0 {
748 return false;
749 }
750
751 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
752 || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
753 }
754
755 fn dict_fallback(&mut self) -> Result<()> {
758 if self.page_metrics.num_buffered_values > 0 {
760 self.add_data_page()?;
761 }
762 self.write_dictionary_page()?;
763 self.flush_data_pages()?;
764 Ok(())
765 }
766
767 fn update_column_offset_index(
769 &mut self,
770 page_statistics: Option<&ValueStatistics<E::T>>,
771 page_variable_length_bytes: Option<i64>,
772 ) {
773 let null_page =
775 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
776 if null_page && self.column_index_builder.valid() {
779 self.column_index_builder.append(
780 null_page,
781 vec![],
782 vec![],
783 self.page_metrics.num_page_nulls as i64,
784 );
785 } else if self.column_index_builder.valid() {
786 match &page_statistics {
789 None => {
790 self.column_index_builder.to_invalid();
791 }
792 Some(stat) => {
793 let new_min = stat.min_opt().unwrap();
795 let new_max = stat.max_opt().unwrap();
796 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
797 if self.data_page_boundary_ascending {
798 let not_ascending = compare_greater(&self.descr, last_min, new_min)
800 || compare_greater(&self.descr, last_max, new_max);
801 if not_ascending {
802 self.data_page_boundary_ascending = false;
803 }
804 }
805
806 if self.data_page_boundary_descending {
807 let not_descending = compare_greater(&self.descr, new_min, last_min)
809 || compare_greater(&self.descr, new_max, last_max);
810 if not_descending {
811 self.data_page_boundary_descending = false;
812 }
813 }
814 }
815 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
816
817 if self.can_truncate_value() {
818 self.column_index_builder.append(
819 null_page,
820 self.truncate_min_value(
821 self.props.column_index_truncate_length(),
822 stat.min_bytes_opt().unwrap(),
823 )
824 .0,
825 self.truncate_max_value(
826 self.props.column_index_truncate_length(),
827 stat.max_bytes_opt().unwrap(),
828 )
829 .0,
830 self.page_metrics.num_page_nulls as i64,
831 );
832 } else {
833 self.column_index_builder.append(
834 null_page,
835 stat.min_bytes_opt().unwrap().to_vec(),
836 stat.max_bytes_opt().unwrap().to_vec(),
837 self.page_metrics.num_page_nulls as i64,
838 );
839 }
840 }
841 }
842 }
843
844 self.column_index_builder.append_histograms(
846 &self.page_metrics.repetition_level_histogram,
847 &self.page_metrics.definition_level_histogram,
848 );
849
850 if let Some(builder) = self.offset_index_builder.as_mut() {
852 builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
853 builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
854 }
855 }
856
857 fn can_truncate_value(&self) -> bool {
859 match self.descr.physical_type() {
860 Type::FIXED_LEN_BYTE_ARRAY
864 if !matches!(
865 self.descr.logical_type(),
866 Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
867 ) =>
868 {
869 true
870 }
871 Type::BYTE_ARRAY => true,
872 _ => false,
874 }
875 }
876
877 fn is_utf8(&self) -> bool {
879 self.get_descriptor().logical_type() == Some(LogicalType::String)
880 || self.get_descriptor().converted_type() == ConvertedType::UTF8
881 }
882
883 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
894 truncation_length
895 .filter(|l| data.len() > *l)
896 .and_then(|l|
897 if self.is_utf8() {
899 match str::from_utf8(data) {
900 Ok(str_data) => truncate_utf8(str_data, l),
901 Err(_) => Some(data[..l].to_vec()),
902 }
903 } else {
904 Some(data[..l].to_vec())
905 }
906 )
907 .map(|truncated| (truncated, true))
908 .unwrap_or_else(|| (data.to_vec(), false))
909 }
910
911 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
925 truncation_length
926 .filter(|l| data.len() > *l)
927 .and_then(|l|
928 if self.is_utf8() {
930 match str::from_utf8(data) {
931 Ok(str_data) => truncate_and_increment_utf8(str_data, l),
932 Err(_) => increment(data[..l].to_vec()),
933 }
934 } else {
935 increment(data[..l].to_vec())
936 }
937 )
938 .map(|truncated| (truncated, true))
939 .unwrap_or_else(|| (data.to_vec(), false))
940 }
941
942 fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
945 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
946 match statistics {
947 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
948 let (min, did_truncate_min) = self.truncate_min_value(
949 self.props.statistics_truncate_length(),
950 stats.min_bytes_opt().unwrap(),
951 );
952 let (max, did_truncate_max) = self.truncate_max_value(
953 self.props.statistics_truncate_length(),
954 stats.max_bytes_opt().unwrap(),
955 );
956 Statistics::ByteArray(
957 ValueStatistics::new(
958 Some(min.into()),
959 Some(max.into()),
960 stats.distinct_count(),
961 stats.null_count_opt(),
962 backwards_compatible_min_max,
963 )
964 .with_max_is_exact(!did_truncate_max)
965 .with_min_is_exact(!did_truncate_min),
966 )
967 }
968 Statistics::FixedLenByteArray(stats)
969 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
970 {
971 let (min, did_truncate_min) = self.truncate_min_value(
972 self.props.statistics_truncate_length(),
973 stats.min_bytes_opt().unwrap(),
974 );
975 let (max, did_truncate_max) = self.truncate_max_value(
976 self.props.statistics_truncate_length(),
977 stats.max_bytes_opt().unwrap(),
978 );
979 Statistics::FixedLenByteArray(
980 ValueStatistics::new(
981 Some(min.into()),
982 Some(max.into()),
983 stats.distinct_count(),
984 stats.null_count_opt(),
985 backwards_compatible_min_max,
986 )
987 .with_max_is_exact(!did_truncate_max)
988 .with_min_is_exact(!did_truncate_min),
989 )
990 }
991 stats => stats,
992 }
993 }
994
995 fn add_data_page(&mut self) -> Result<()> {
998 let values_data = self.encoder.flush_data_page()?;
1000
1001 let max_def_level = self.descr.max_def_level();
1002 let max_rep_level = self.descr.max_rep_level();
1003
1004 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1005
1006 let page_statistics = match (values_data.min_value, values_data.max_value) {
1007 (Some(min), Some(max)) => {
1008 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1010 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1011
1012 (self.statistics_enabled == EnabledStatistics::Page).then_some(
1013 ValueStatistics::new(
1014 Some(min),
1015 Some(max),
1016 None,
1017 Some(self.page_metrics.num_page_nulls),
1018 false,
1019 ),
1020 )
1021 }
1022 _ => None,
1023 };
1024
1025 self.update_column_offset_index(
1027 page_statistics.as_ref(),
1028 values_data.variable_length_bytes,
1029 );
1030
1031 self.column_metrics
1033 .update_from_page_metrics(&self.page_metrics);
1034 self.column_metrics
1035 .update_variable_length_bytes(values_data.variable_length_bytes);
1036
1037 let page_statistics = page_statistics
1039 .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1040 .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1041
1042 let compressed_page = match self.props.writer_version() {
1043 WriterVersion::PARQUET_1_0 => {
1044 let mut buffer = vec![];
1045
1046 if max_rep_level > 0 {
1047 buffer.extend_from_slice(
1048 &self.encode_levels_v1(
1049 Encoding::RLE,
1050 &self.rep_levels_sink[..],
1051 max_rep_level,
1052 )[..],
1053 );
1054 }
1055
1056 if max_def_level > 0 {
1057 buffer.extend_from_slice(
1058 &self.encode_levels_v1(
1059 Encoding::RLE,
1060 &self.def_levels_sink[..],
1061 max_def_level,
1062 )[..],
1063 );
1064 }
1065
1066 buffer.extend_from_slice(&values_data.buf);
1067 let uncompressed_size = buffer.len();
1068
1069 if let Some(ref mut cmpr) = self.compressor {
1070 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1071 cmpr.compress(&buffer[..], &mut compressed_buf)?;
1072 buffer = compressed_buf;
1073 }
1074
1075 let data_page = Page::DataPage {
1076 buf: buffer.into(),
1077 num_values: self.page_metrics.num_buffered_values,
1078 encoding: values_data.encoding,
1079 def_level_encoding: Encoding::RLE,
1080 rep_level_encoding: Encoding::RLE,
1081 statistics: page_statistics,
1082 };
1083
1084 CompressedPage::new(data_page, uncompressed_size)
1085 }
1086 WriterVersion::PARQUET_2_0 => {
1087 let mut rep_levels_byte_len = 0;
1088 let mut def_levels_byte_len = 0;
1089 let mut buffer = vec![];
1090
1091 if max_rep_level > 0 {
1092 let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1093 rep_levels_byte_len = levels.len();
1094 buffer.extend_from_slice(&levels[..]);
1095 }
1096
1097 if max_def_level > 0 {
1098 let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1099 def_levels_byte_len = levels.len();
1100 buffer.extend_from_slice(&levels[..]);
1101 }
1102
1103 let uncompressed_size =
1104 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1105
1106 match self.compressor {
1108 Some(ref mut cmpr) => {
1109 cmpr.compress(&values_data.buf, &mut buffer)?;
1110 }
1111 None => buffer.extend_from_slice(&values_data.buf),
1112 }
1113
1114 let data_page = Page::DataPageV2 {
1115 buf: buffer.into(),
1116 num_values: self.page_metrics.num_buffered_values,
1117 encoding: values_data.encoding,
1118 num_nulls: self.page_metrics.num_page_nulls as u32,
1119 num_rows: self.page_metrics.num_buffered_rows,
1120 def_levels_byte_len: def_levels_byte_len as u32,
1121 rep_levels_byte_len: rep_levels_byte_len as u32,
1122 is_compressed: self.compressor.is_some(),
1123 statistics: page_statistics,
1124 };
1125
1126 CompressedPage::new(data_page, uncompressed_size)
1127 }
1128 };
1129
1130 if self.encoder.has_dictionary() {
1132 self.data_pages.push_back(compressed_page);
1133 } else {
1134 self.write_data_page(compressed_page)?;
1135 }
1136
1137 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1139
1140 self.rep_levels_sink.clear();
1142 self.def_levels_sink.clear();
1143 self.page_metrics.new_page();
1144
1145 Ok(())
1146 }
1147
1148 #[inline]
1151 fn flush_data_pages(&mut self) -> Result<()> {
1152 if self.page_metrics.num_buffered_values > 0 {
1154 self.add_data_page()?;
1155 }
1156
1157 while let Some(page) = self.data_pages.pop_front() {
1158 self.write_data_page(page)?;
1159 }
1160
1161 Ok(())
1162 }
1163
1164 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1166 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1167 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1168 let num_values = self.column_metrics.total_num_values as i64;
1169 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1170 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1172
1173 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1174 .set_compression(self.codec)
1175 .set_encodings(self.encodings.iter().cloned().collect())
1176 .set_page_encoding_stats(self.encoding_stats.clone())
1177 .set_total_compressed_size(total_compressed_size)
1178 .set_total_uncompressed_size(total_uncompressed_size)
1179 .set_num_values(num_values)
1180 .set_data_page_offset(data_page_offset)
1181 .set_dictionary_page_offset(dict_page_offset);
1182
1183 if self.statistics_enabled != EnabledStatistics::None {
1184 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1185
1186 let statistics = ValueStatistics::<E::T>::new(
1187 self.column_metrics.min_column_value.clone(),
1188 self.column_metrics.max_column_value.clone(),
1189 self.column_metrics.column_distinct_count,
1190 Some(self.column_metrics.num_column_nulls),
1191 false,
1192 )
1193 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1194 .into();
1195
1196 let statistics = self.truncate_statistics(statistics);
1197
1198 builder = builder
1199 .set_statistics(statistics)
1200 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1201 .set_repetition_level_histogram(
1202 self.column_metrics.repetition_level_histogram.take(),
1203 )
1204 .set_definition_level_histogram(
1205 self.column_metrics.definition_level_histogram.take(),
1206 );
1207 }
1208
1209 builder = self.set_column_chunk_encryption_properties(builder);
1210
1211 let metadata = builder.build()?;
1212 Ok(metadata)
1213 }
1214
1215 #[inline]
1217 fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1218 let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1219 encoder.put(levels);
1220 encoder.consume()
1221 }
1222
1223 #[inline]
1226 fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1227 let mut encoder = LevelEncoder::v2(max_level, levels.len());
1228 encoder.put(levels);
1229 encoder.consume()
1230 }
1231
1232 #[inline]
1234 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1235 self.encodings.insert(page.encoding());
1236 match self.encoding_stats.last_mut() {
1237 Some(encoding_stats)
1238 if encoding_stats.page_type == page.page_type()
1239 && encoding_stats.encoding == page.encoding() =>
1240 {
1241 encoding_stats.count += 1;
1242 }
1243 _ => {
1244 self.encoding_stats.push(PageEncodingStats {
1247 page_type: page.page_type(),
1248 encoding: page.encoding(),
1249 count: 1,
1250 });
1251 }
1252 }
1253 let page_spec = self.page_writer.write_page(page)?;
1254 if let Some(builder) = self.offset_index_builder.as_mut() {
1257 builder
1258 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1259 }
1260 self.update_metrics_for_page(page_spec);
1261 Ok(())
1262 }
1263
1264 #[inline]
1266 fn write_dictionary_page(&mut self) -> Result<()> {
1267 let compressed_page = {
1268 let mut page = self
1269 .encoder
1270 .flush_dict_page()?
1271 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1272
1273 let uncompressed_size = page.buf.len();
1274
1275 if let Some(ref mut cmpr) = self.compressor {
1276 let mut output_buf = Vec::with_capacity(uncompressed_size);
1277 cmpr.compress(&page.buf, &mut output_buf)?;
1278 page.buf = Bytes::from(output_buf);
1279 }
1280
1281 let dict_page = Page::DictionaryPage {
1282 buf: page.buf,
1283 num_values: page.num_values as u32,
1284 encoding: self.props.dictionary_page_encoding(),
1285 is_sorted: page.is_sorted,
1286 };
1287 CompressedPage::new(dict_page, uncompressed_size)
1288 };
1289
1290 self.encodings.insert(compressed_page.encoding());
1291 self.encoding_stats.push(PageEncodingStats {
1292 page_type: PageType::DICTIONARY_PAGE,
1293 encoding: compressed_page.encoding(),
1294 count: 1,
1295 });
1296 let page_spec = self.page_writer.write_page(compressed_page)?;
1297 self.update_metrics_for_page(page_spec);
1298 Ok(())
1300 }
1301
1302 #[inline]
1304 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1305 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1306 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1307 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1308
1309 match page_spec.page_type {
1310 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1311 self.column_metrics.total_num_values += page_spec.num_values as u64;
1312 if self.column_metrics.data_page_offset.is_none() {
1313 self.column_metrics.data_page_offset = Some(page_spec.offset);
1314 }
1315 }
1316 PageType::DICTIONARY_PAGE => {
1317 assert!(
1318 self.column_metrics.dictionary_page_offset.is_none(),
1319 "Dictionary offset is already set"
1320 );
1321 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1322 }
1323 _ => {}
1324 }
1325 }
1326
1327 #[inline]
1328 #[cfg(feature = "encryption")]
1329 fn set_column_chunk_encryption_properties(
1330 &self,
1331 builder: ColumnChunkMetaDataBuilder,
1332 ) -> ColumnChunkMetaDataBuilder {
1333 if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1334 builder.set_column_crypto_metadata(get_column_crypto_metadata(
1335 encryption_properties,
1336 &self.descr,
1337 ))
1338 } else {
1339 builder
1340 }
1341 }
1342
1343 #[inline]
1344 #[cfg(not(feature = "encryption"))]
1345 fn set_column_chunk_encryption_properties(
1346 &self,
1347 builder: ColumnChunkMetaDataBuilder,
1348 ) -> ColumnChunkMetaDataBuilder {
1349 builder
1350 }
1351}
1352
1353fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1354 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1355}
1356
1357fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1358 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1359}
1360
1361#[inline]
1362#[allow(clippy::eq_op)]
1363fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1364 match T::PHYSICAL_TYPE {
1365 Type::FLOAT | Type::DOUBLE => val != val,
1366 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1367 let val = val.as_bytes();
1368 let val = f16::from_le_bytes([val[0], val[1]]);
1369 val.is_nan()
1370 }
1371 _ => false,
1372 }
1373}
1374
1375fn update_stat<T: ParquetValueType, F>(
1380 descr: &ColumnDescriptor,
1381 val: &T,
1382 cur: &mut Option<T>,
1383 should_update: F,
1384) where
1385 F: Fn(&T) -> bool,
1386{
1387 if is_nan(descr, val) {
1388 return;
1389 }
1390
1391 if cur.as_ref().is_none_or(should_update) {
1392 *cur = Some(val.clone());
1393 }
1394}
1395
1396fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1398 match T::PHYSICAL_TYPE {
1399 Type::INT32 | Type::INT64 => {
1400 if let Some(LogicalType::Integer {
1401 is_signed: false, ..
1402 }) = descr.logical_type()
1403 {
1404 return compare_greater_unsigned_int(a, b);
1406 }
1407
1408 match descr.converted_type() {
1409 ConvertedType::UINT_8
1410 | ConvertedType::UINT_16
1411 | ConvertedType::UINT_32
1412 | ConvertedType::UINT_64 => {
1413 return compare_greater_unsigned_int(a, b);
1414 }
1415 _ => {}
1416 };
1417 }
1418 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1419 if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1420 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1421 }
1422 if let ConvertedType::DECIMAL = descr.converted_type() {
1423 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1424 }
1425 if let Some(LogicalType::Float16) = descr.logical_type() {
1426 return compare_greater_f16(a.as_bytes(), b.as_bytes());
1427 }
1428 }
1429
1430 _ => {}
1431 }
1432
1433 a > b
1435}
1436
1437fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1445 match (kind, props.writer_version()) {
1446 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1447 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1448 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1449 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1450 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1451 _ => Encoding::PLAIN,
1452 }
1453}
1454
1455fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1457 match (kind, props.writer_version()) {
1458 (Type::BOOLEAN, _) => false,
1460 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1462 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1463 _ => true,
1464 }
1465}
1466
1467#[inline]
1468fn compare_greater_unsigned_int<T: ParquetValueType>(a: &T, b: &T) -> bool {
1469 a.as_u64().unwrap() > b.as_u64().unwrap()
1470}
1471
1472#[inline]
1473fn compare_greater_f16(a: &[u8], b: &[u8]) -> bool {
1474 let a = f16::from_le_bytes(a.try_into().unwrap());
1475 let b = f16::from_le_bytes(b.try_into().unwrap());
1476 a > b
1477}
1478
1479fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1481 let a_length = a.len();
1482 let b_length = b.len();
1483
1484 if a_length == 0 || b_length == 0 {
1485 return a_length > 0;
1486 }
1487
1488 let first_a: u8 = a[0];
1489 let first_b: u8 = b[0];
1490
1491 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1496 return (first_a as i8) > (first_b as i8);
1497 }
1498
1499 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1505
1506 if a_length != b_length {
1507 let not_equal = if a_length > b_length {
1508 let lead_length = a_length - b_length;
1509 a[0..lead_length].iter().any(|&x| x != extension)
1510 } else {
1511 let lead_length = b_length - a_length;
1512 b[0..lead_length].iter().any(|&x| x != extension)
1513 };
1514
1515 if not_equal {
1516 let negative_values: bool = (first_a as i8) < 0;
1517 let a_longer: bool = a_length > b_length;
1518 return if negative_values { !a_longer } else { a_longer };
1519 }
1520 }
1521
1522 (a[1..]) > (b[1..])
1523}
1524
1525fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1531 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1532 Some(data.as_bytes()[..split].to_vec())
1533}
1534
1535fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1541 let lower_bound = length.saturating_sub(3);
1543 let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1544 increment_utf8(data.get(..split)?)
1545}
1546
1547fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1554 for (idx, original_char) in data.char_indices().rev() {
1555 let original_len = original_char.len_utf8();
1556 if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1557 if next_char.len_utf8() == original_len {
1559 let mut result = data.as_bytes()[..idx + original_len].to_vec();
1560 next_char.encode_utf8(&mut result[idx..]);
1561 return Some(result);
1562 }
1563 }
1564 }
1565
1566 None
1567}
1568
1569fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1573 for byte in data.iter_mut().rev() {
1574 let (incremented, overflow) = byte.overflowing_add(1);
1575 *byte = incremented;
1576
1577 if !overflow {
1578 return Some(data);
1579 }
1580 }
1581
1582 None
1583}
1584
1585#[cfg(test)]
1586mod tests {
1587 use crate::{
1588 file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1589 schema::parser::parse_message_type,
1590 };
1591 use core::str;
1592 use rand::distr::uniform::SampleUniform;
1593 use std::{fs::File, sync::Arc};
1594
1595 use crate::column::{
1596 page::PageReader,
1597 reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1598 };
1599 use crate::file::writer::TrackedWrite;
1600 use crate::file::{
1601 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1602 };
1603 use crate::schema::types::{ColumnPath, Type as SchemaType};
1604 use crate::util::test_common::rand_gen::random_numbers_range;
1605
1606 use super::*;
1607
1608 #[test]
1609 fn test_column_writer_inconsistent_def_rep_length() {
1610 let page_writer = get_test_page_writer();
1611 let props = Default::default();
1612 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1613 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1614 assert!(res.is_err());
1615 if let Err(err) = res {
1616 assert_eq!(
1617 format!("{err}"),
1618 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1619 );
1620 }
1621 }
1622
1623 #[test]
1624 fn test_column_writer_invalid_def_levels() {
1625 let page_writer = get_test_page_writer();
1626 let props = Default::default();
1627 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1628 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1629 assert!(res.is_err());
1630 if let Err(err) = res {
1631 assert_eq!(
1632 format!("{err}"),
1633 "Parquet error: Definition levels are required, because max definition level = 1"
1634 );
1635 }
1636 }
1637
1638 #[test]
1639 fn test_column_writer_invalid_rep_levels() {
1640 let page_writer = get_test_page_writer();
1641 let props = Default::default();
1642 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1643 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1644 assert!(res.is_err());
1645 if let Err(err) = res {
1646 assert_eq!(
1647 format!("{err}"),
1648 "Parquet error: Repetition levels are required, because max repetition level = 1"
1649 );
1650 }
1651 }
1652
1653 #[test]
1654 fn test_column_writer_not_enough_values_to_write() {
1655 let page_writer = get_test_page_writer();
1656 let props = Default::default();
1657 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1658 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1659 assert!(res.is_err());
1660 if let Err(err) = res {
1661 assert_eq!(
1662 format!("{err}"),
1663 "Parquet error: Expected to write 4 values, but have only 2"
1664 );
1665 }
1666 }
1667
1668 #[test]
1669 fn test_column_writer_write_only_one_dictionary_page() {
1670 let page_writer = get_test_page_writer();
1671 let props = Default::default();
1672 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1673 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1674 writer.add_data_page().unwrap();
1676 writer.write_dictionary_page().unwrap();
1677 let err = writer.write_dictionary_page().unwrap_err().to_string();
1678 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1679 }
1680
1681 #[test]
1682 fn test_column_writer_error_when_writing_disabled_dictionary() {
1683 let page_writer = get_test_page_writer();
1684 let props = Arc::new(
1685 WriterProperties::builder()
1686 .set_dictionary_enabled(false)
1687 .build(),
1688 );
1689 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1690 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1691 let err = writer.write_dictionary_page().unwrap_err().to_string();
1692 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1693 }
1694
1695 #[test]
1696 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1697 let page_writer = get_test_page_writer();
1698 let props = Arc::new(
1699 WriterProperties::builder()
1700 .set_dictionary_enabled(true)
1701 .build(),
1702 );
1703 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1704 writer
1705 .write_batch(&[true, false, true, false], None, None)
1706 .unwrap();
1707
1708 let r = writer.close().unwrap();
1709 assert_eq!(r.bytes_written, 1);
1712 assert_eq!(r.rows_written, 4);
1713
1714 let metadata = r.metadata;
1715 assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1716 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
1718 }
1719
1720 #[test]
1721 fn test_column_writer_default_encoding_support_bool() {
1722 check_encoding_write_support::<BoolType>(
1723 WriterVersion::PARQUET_1_0,
1724 true,
1725 &[true, false],
1726 None,
1727 &[Encoding::PLAIN, Encoding::RLE],
1728 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1729 );
1730 check_encoding_write_support::<BoolType>(
1731 WriterVersion::PARQUET_1_0,
1732 false,
1733 &[true, false],
1734 None,
1735 &[Encoding::PLAIN, Encoding::RLE],
1736 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1737 );
1738 check_encoding_write_support::<BoolType>(
1739 WriterVersion::PARQUET_2_0,
1740 true,
1741 &[true, false],
1742 None,
1743 &[Encoding::RLE],
1744 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1745 );
1746 check_encoding_write_support::<BoolType>(
1747 WriterVersion::PARQUET_2_0,
1748 false,
1749 &[true, false],
1750 None,
1751 &[Encoding::RLE],
1752 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1753 );
1754 }
1755
1756 #[test]
1757 fn test_column_writer_default_encoding_support_int32() {
1758 check_encoding_write_support::<Int32Type>(
1759 WriterVersion::PARQUET_1_0,
1760 true,
1761 &[1, 2],
1762 Some(0),
1763 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1764 &[
1765 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1766 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1767 ],
1768 );
1769 check_encoding_write_support::<Int32Type>(
1770 WriterVersion::PARQUET_1_0,
1771 false,
1772 &[1, 2],
1773 None,
1774 &[Encoding::PLAIN, Encoding::RLE],
1775 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1776 );
1777 check_encoding_write_support::<Int32Type>(
1778 WriterVersion::PARQUET_2_0,
1779 true,
1780 &[1, 2],
1781 Some(0),
1782 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1783 &[
1784 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1785 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1786 ],
1787 );
1788 check_encoding_write_support::<Int32Type>(
1789 WriterVersion::PARQUET_2_0,
1790 false,
1791 &[1, 2],
1792 None,
1793 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1794 &[encoding_stats(
1795 PageType::DATA_PAGE_V2,
1796 Encoding::DELTA_BINARY_PACKED,
1797 1,
1798 )],
1799 );
1800 }
1801
1802 #[test]
1803 fn test_column_writer_default_encoding_support_int64() {
1804 check_encoding_write_support::<Int64Type>(
1805 WriterVersion::PARQUET_1_0,
1806 true,
1807 &[1, 2],
1808 Some(0),
1809 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1810 &[
1811 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1812 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1813 ],
1814 );
1815 check_encoding_write_support::<Int64Type>(
1816 WriterVersion::PARQUET_1_0,
1817 false,
1818 &[1, 2],
1819 None,
1820 &[Encoding::PLAIN, Encoding::RLE],
1821 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1822 );
1823 check_encoding_write_support::<Int64Type>(
1824 WriterVersion::PARQUET_2_0,
1825 true,
1826 &[1, 2],
1827 Some(0),
1828 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1829 &[
1830 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1831 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1832 ],
1833 );
1834 check_encoding_write_support::<Int64Type>(
1835 WriterVersion::PARQUET_2_0,
1836 false,
1837 &[1, 2],
1838 None,
1839 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1840 &[encoding_stats(
1841 PageType::DATA_PAGE_V2,
1842 Encoding::DELTA_BINARY_PACKED,
1843 1,
1844 )],
1845 );
1846 }
1847
1848 #[test]
1849 fn test_column_writer_default_encoding_support_int96() {
1850 check_encoding_write_support::<Int96Type>(
1851 WriterVersion::PARQUET_1_0,
1852 true,
1853 &[Int96::from(vec![1, 2, 3])],
1854 Some(0),
1855 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1856 &[
1857 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1858 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1859 ],
1860 );
1861 check_encoding_write_support::<Int96Type>(
1862 WriterVersion::PARQUET_1_0,
1863 false,
1864 &[Int96::from(vec![1, 2, 3])],
1865 None,
1866 &[Encoding::PLAIN, Encoding::RLE],
1867 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1868 );
1869 check_encoding_write_support::<Int96Type>(
1870 WriterVersion::PARQUET_2_0,
1871 true,
1872 &[Int96::from(vec![1, 2, 3])],
1873 Some(0),
1874 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1875 &[
1876 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1877 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1878 ],
1879 );
1880 check_encoding_write_support::<Int96Type>(
1881 WriterVersion::PARQUET_2_0,
1882 false,
1883 &[Int96::from(vec![1, 2, 3])],
1884 None,
1885 &[Encoding::PLAIN, Encoding::RLE],
1886 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1887 );
1888 }
1889
1890 #[test]
1891 fn test_column_writer_default_encoding_support_float() {
1892 check_encoding_write_support::<FloatType>(
1893 WriterVersion::PARQUET_1_0,
1894 true,
1895 &[1.0, 2.0],
1896 Some(0),
1897 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1898 &[
1899 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1900 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1901 ],
1902 );
1903 check_encoding_write_support::<FloatType>(
1904 WriterVersion::PARQUET_1_0,
1905 false,
1906 &[1.0, 2.0],
1907 None,
1908 &[Encoding::PLAIN, Encoding::RLE],
1909 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1910 );
1911 check_encoding_write_support::<FloatType>(
1912 WriterVersion::PARQUET_2_0,
1913 true,
1914 &[1.0, 2.0],
1915 Some(0),
1916 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1917 &[
1918 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1919 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1920 ],
1921 );
1922 check_encoding_write_support::<FloatType>(
1923 WriterVersion::PARQUET_2_0,
1924 false,
1925 &[1.0, 2.0],
1926 None,
1927 &[Encoding::PLAIN, Encoding::RLE],
1928 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1929 );
1930 }
1931
1932 #[test]
1933 fn test_column_writer_default_encoding_support_double() {
1934 check_encoding_write_support::<DoubleType>(
1935 WriterVersion::PARQUET_1_0,
1936 true,
1937 &[1.0, 2.0],
1938 Some(0),
1939 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1940 &[
1941 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1942 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1943 ],
1944 );
1945 check_encoding_write_support::<DoubleType>(
1946 WriterVersion::PARQUET_1_0,
1947 false,
1948 &[1.0, 2.0],
1949 None,
1950 &[Encoding::PLAIN, Encoding::RLE],
1951 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1952 );
1953 check_encoding_write_support::<DoubleType>(
1954 WriterVersion::PARQUET_2_0,
1955 true,
1956 &[1.0, 2.0],
1957 Some(0),
1958 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1959 &[
1960 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1961 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1962 ],
1963 );
1964 check_encoding_write_support::<DoubleType>(
1965 WriterVersion::PARQUET_2_0,
1966 false,
1967 &[1.0, 2.0],
1968 None,
1969 &[Encoding::PLAIN, Encoding::RLE],
1970 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1971 );
1972 }
1973
1974 #[test]
1975 fn test_column_writer_default_encoding_support_byte_array() {
1976 check_encoding_write_support::<ByteArrayType>(
1977 WriterVersion::PARQUET_1_0,
1978 true,
1979 &[ByteArray::from(vec![1u8])],
1980 Some(0),
1981 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1982 &[
1983 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1984 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1985 ],
1986 );
1987 check_encoding_write_support::<ByteArrayType>(
1988 WriterVersion::PARQUET_1_0,
1989 false,
1990 &[ByteArray::from(vec![1u8])],
1991 None,
1992 &[Encoding::PLAIN, Encoding::RLE],
1993 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1994 );
1995 check_encoding_write_support::<ByteArrayType>(
1996 WriterVersion::PARQUET_2_0,
1997 true,
1998 &[ByteArray::from(vec![1u8])],
1999 Some(0),
2000 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2001 &[
2002 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2003 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2004 ],
2005 );
2006 check_encoding_write_support::<ByteArrayType>(
2007 WriterVersion::PARQUET_2_0,
2008 false,
2009 &[ByteArray::from(vec![1u8])],
2010 None,
2011 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2012 &[encoding_stats(
2013 PageType::DATA_PAGE_V2,
2014 Encoding::DELTA_BYTE_ARRAY,
2015 1,
2016 )],
2017 );
2018 }
2019
2020 #[test]
2021 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2022 check_encoding_write_support::<FixedLenByteArrayType>(
2023 WriterVersion::PARQUET_1_0,
2024 true,
2025 &[ByteArray::from(vec![1u8]).into()],
2026 None,
2027 &[Encoding::PLAIN, Encoding::RLE],
2028 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2029 );
2030 check_encoding_write_support::<FixedLenByteArrayType>(
2031 WriterVersion::PARQUET_1_0,
2032 false,
2033 &[ByteArray::from(vec![1u8]).into()],
2034 None,
2035 &[Encoding::PLAIN, Encoding::RLE],
2036 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2037 );
2038 check_encoding_write_support::<FixedLenByteArrayType>(
2039 WriterVersion::PARQUET_2_0,
2040 true,
2041 &[ByteArray::from(vec![1u8]).into()],
2042 Some(0),
2043 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2044 &[
2045 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2046 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2047 ],
2048 );
2049 check_encoding_write_support::<FixedLenByteArrayType>(
2050 WriterVersion::PARQUET_2_0,
2051 false,
2052 &[ByteArray::from(vec![1u8]).into()],
2053 None,
2054 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2055 &[encoding_stats(
2056 PageType::DATA_PAGE_V2,
2057 Encoding::DELTA_BYTE_ARRAY,
2058 1,
2059 )],
2060 );
2061 }
2062
2063 #[test]
2064 fn test_column_writer_check_metadata() {
2065 let page_writer = get_test_page_writer();
2066 let props = Default::default();
2067 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2068 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2069
2070 let r = writer.close().unwrap();
2071 assert_eq!(r.bytes_written, 20);
2072 assert_eq!(r.rows_written, 4);
2073
2074 let metadata = r.metadata;
2075 assert_eq!(
2076 metadata.encodings(),
2077 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2078 );
2079 assert_eq!(metadata.num_values(), 4);
2080 assert_eq!(metadata.compressed_size(), 20);
2081 assert_eq!(metadata.uncompressed_size(), 20);
2082 assert_eq!(metadata.data_page_offset(), 0);
2083 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2084 if let Some(stats) = metadata.statistics() {
2085 assert_eq!(stats.null_count_opt(), Some(0));
2086 assert_eq!(stats.distinct_count_opt(), None);
2087 if let Statistics::Int32(stats) = stats {
2088 assert_eq!(stats.min_opt().unwrap(), &1);
2089 assert_eq!(stats.max_opt().unwrap(), &4);
2090 } else {
2091 panic!("expecting Statistics::Int32");
2092 }
2093 } else {
2094 panic!("metadata missing statistics");
2095 }
2096 }
2097
2098 #[test]
2099 fn test_column_writer_check_byte_array_min_max() {
2100 let page_writer = get_test_page_writer();
2101 let props = Default::default();
2102 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2103 writer
2104 .write_batch(
2105 &[
2106 ByteArray::from(vec![
2107 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2108 35u8, 231u8, 90u8, 0u8, 0u8,
2109 ]),
2110 ByteArray::from(vec![
2111 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2112 152u8, 177u8, 56u8, 0u8, 0u8,
2113 ]),
2114 ByteArray::from(vec![
2115 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2116 0u8,
2117 ]),
2118 ByteArray::from(vec![
2119 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2120 44u8, 0u8, 0u8,
2121 ]),
2122 ],
2123 None,
2124 None,
2125 )
2126 .unwrap();
2127 let metadata = writer.close().unwrap().metadata;
2128 if let Some(stats) = metadata.statistics() {
2129 if let Statistics::ByteArray(stats) = stats {
2130 assert_eq!(
2131 stats.min_opt().unwrap(),
2132 &ByteArray::from(vec![
2133 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2134 35u8, 231u8, 90u8, 0u8, 0u8,
2135 ])
2136 );
2137 assert_eq!(
2138 stats.max_opt().unwrap(),
2139 &ByteArray::from(vec![
2140 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2141 44u8, 0u8, 0u8,
2142 ])
2143 );
2144 } else {
2145 panic!("expecting Statistics::ByteArray");
2146 }
2147 } else {
2148 panic!("metadata missing statistics");
2149 }
2150 }
2151
2152 #[test]
2153 fn test_column_writer_uint32_converted_type_min_max() {
2154 let page_writer = get_test_page_writer();
2155 let props = Default::default();
2156 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2157 page_writer,
2158 0,
2159 0,
2160 props,
2161 );
2162 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2163 let metadata = writer.close().unwrap().metadata;
2164 if let Some(stats) = metadata.statistics() {
2165 if let Statistics::Int32(stats) = stats {
2166 assert_eq!(stats.min_opt().unwrap(), &0,);
2167 assert_eq!(stats.max_opt().unwrap(), &5,);
2168 } else {
2169 panic!("expecting Statistics::Int32");
2170 }
2171 } else {
2172 panic!("metadata missing statistics");
2173 }
2174 }
2175
2176 #[test]
2177 fn test_column_writer_precalculated_statistics() {
2178 let page_writer = get_test_page_writer();
2179 let props = Arc::new(
2180 WriterProperties::builder()
2181 .set_statistics_enabled(EnabledStatistics::Chunk)
2182 .build(),
2183 );
2184 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2185 writer
2186 .write_batch_with_statistics(
2187 &[1, 2, 3, 4],
2188 None,
2189 None,
2190 Some(&-17),
2191 Some(&9000),
2192 Some(55),
2193 )
2194 .unwrap();
2195
2196 let r = writer.close().unwrap();
2197 assert_eq!(r.bytes_written, 20);
2198 assert_eq!(r.rows_written, 4);
2199
2200 let metadata = r.metadata;
2201 assert_eq!(
2202 metadata.encodings(),
2203 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2204 );
2205 assert_eq!(metadata.num_values(), 4);
2206 assert_eq!(metadata.compressed_size(), 20);
2207 assert_eq!(metadata.uncompressed_size(), 20);
2208 assert_eq!(metadata.data_page_offset(), 0);
2209 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2210 if let Some(stats) = metadata.statistics() {
2211 assert_eq!(stats.null_count_opt(), Some(0));
2212 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2213 if let Statistics::Int32(stats) = stats {
2214 assert_eq!(stats.min_opt().unwrap(), &-17);
2215 assert_eq!(stats.max_opt().unwrap(), &9000);
2216 } else {
2217 panic!("expecting Statistics::Int32");
2218 }
2219 } else {
2220 panic!("metadata missing statistics");
2221 }
2222 }
2223
2224 #[test]
2225 fn test_mixed_precomputed_statistics() {
2226 let mut buf = Vec::with_capacity(100);
2227 let mut write = TrackedWrite::new(&mut buf);
2228 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2229 let props = Arc::new(
2230 WriterProperties::builder()
2231 .set_write_page_header_statistics(true)
2232 .build(),
2233 );
2234 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2235
2236 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2237 writer
2238 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2239 .unwrap();
2240
2241 let r = writer.close().unwrap();
2242
2243 let stats = r.metadata.statistics().unwrap();
2244 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2245 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2246 assert_eq!(stats.null_count_opt(), Some(0));
2247 assert!(stats.distinct_count_opt().is_none());
2248
2249 drop(write);
2250
2251 let props = ReaderProperties::builder()
2252 .set_backward_compatible_lz4(false)
2253 .build();
2254 let reader = SerializedPageReader::new_with_properties(
2255 Arc::new(Bytes::from(buf)),
2256 &r.metadata,
2257 r.rows_written as usize,
2258 None,
2259 Arc::new(props),
2260 )
2261 .unwrap();
2262
2263 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2264 assert_eq!(pages.len(), 2);
2265
2266 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2267 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2268
2269 let page_statistics = pages[1].statistics().unwrap();
2270 assert_eq!(
2271 page_statistics.min_bytes_opt().unwrap(),
2272 1_i32.to_le_bytes()
2273 );
2274 assert_eq!(
2275 page_statistics.max_bytes_opt().unwrap(),
2276 7_i32.to_le_bytes()
2277 );
2278 assert_eq!(page_statistics.null_count_opt(), Some(0));
2279 assert!(page_statistics.distinct_count_opt().is_none());
2280 }
2281
2282 #[test]
2283 fn test_disabled_statistics() {
2284 let mut buf = Vec::with_capacity(100);
2285 let mut write = TrackedWrite::new(&mut buf);
2286 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2287 let props = WriterProperties::builder()
2288 .set_statistics_enabled(EnabledStatistics::None)
2289 .set_writer_version(WriterVersion::PARQUET_2_0)
2290 .build();
2291 let props = Arc::new(props);
2292
2293 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2294 writer
2295 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2296 .unwrap();
2297
2298 let r = writer.close().unwrap();
2299 assert!(r.metadata.statistics().is_none());
2300
2301 drop(write);
2302
2303 let props = ReaderProperties::builder()
2304 .set_backward_compatible_lz4(false)
2305 .build();
2306 let reader = SerializedPageReader::new_with_properties(
2307 Arc::new(Bytes::from(buf)),
2308 &r.metadata,
2309 r.rows_written as usize,
2310 None,
2311 Arc::new(props),
2312 )
2313 .unwrap();
2314
2315 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2316 assert_eq!(pages.len(), 2);
2317
2318 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2319 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2320
2321 match &pages[1] {
2322 Page::DataPageV2 {
2323 num_values,
2324 num_nulls,
2325 num_rows,
2326 statistics,
2327 ..
2328 } => {
2329 assert_eq!(*num_values, 6);
2330 assert_eq!(*num_nulls, 2);
2331 assert_eq!(*num_rows, 6);
2332 assert!(statistics.is_none());
2333 }
2334 _ => unreachable!(),
2335 }
2336 }
2337
2338 #[test]
2339 fn test_column_writer_empty_column_roundtrip() {
2340 let props = Default::default();
2341 column_roundtrip::<Int32Type>(props, &[], None, None);
2342 }
2343
2344 #[test]
2345 fn test_column_writer_non_nullable_values_roundtrip() {
2346 let props = Default::default();
2347 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2348 }
2349
2350 #[test]
2351 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2352 let props = Default::default();
2353 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2354 }
2355
2356 #[test]
2357 fn test_column_writer_nullable_repeated_values_roundtrip() {
2358 let props = Default::default();
2359 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2360 }
2361
2362 #[test]
2363 fn test_column_writer_dictionary_fallback_small_data_page() {
2364 let props = WriterProperties::builder()
2365 .set_dictionary_page_size_limit(32)
2366 .set_data_page_size_limit(32)
2367 .build();
2368 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2369 }
2370
2371 #[test]
2372 fn test_column_writer_small_write_batch_size() {
2373 for i in &[1usize, 2, 5, 10, 11, 1023] {
2374 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2375
2376 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2377 }
2378 }
2379
2380 #[test]
2381 fn test_column_writer_dictionary_disabled_v1() {
2382 let props = WriterProperties::builder()
2383 .set_writer_version(WriterVersion::PARQUET_1_0)
2384 .set_dictionary_enabled(false)
2385 .build();
2386 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2387 }
2388
2389 #[test]
2390 fn test_column_writer_dictionary_disabled_v2() {
2391 let props = WriterProperties::builder()
2392 .set_writer_version(WriterVersion::PARQUET_2_0)
2393 .set_dictionary_enabled(false)
2394 .build();
2395 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2396 }
2397
2398 #[test]
2399 fn test_column_writer_compression_v1() {
2400 let props = WriterProperties::builder()
2401 .set_writer_version(WriterVersion::PARQUET_1_0)
2402 .set_compression(Compression::SNAPPY)
2403 .build();
2404 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2405 }
2406
2407 #[test]
2408 fn test_column_writer_compression_v2() {
2409 let props = WriterProperties::builder()
2410 .set_writer_version(WriterVersion::PARQUET_2_0)
2411 .set_compression(Compression::SNAPPY)
2412 .build();
2413 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2414 }
2415
2416 #[test]
2417 fn test_column_writer_add_data_pages_with_dict() {
2418 let mut file = tempfile::tempfile().unwrap();
2421 let mut write = TrackedWrite::new(&mut file);
2422 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2423 let props = Arc::new(
2424 WriterProperties::builder()
2425 .set_data_page_size_limit(10)
2426 .set_write_batch_size(3) .build(),
2428 );
2429 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2430 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2431 writer.write_batch(data, None, None).unwrap();
2432 let r = writer.close().unwrap();
2433
2434 drop(write);
2435
2436 let props = ReaderProperties::builder()
2438 .set_backward_compatible_lz4(false)
2439 .build();
2440 let mut page_reader = Box::new(
2441 SerializedPageReader::new_with_properties(
2442 Arc::new(file),
2443 &r.metadata,
2444 r.rows_written as usize,
2445 None,
2446 Arc::new(props),
2447 )
2448 .unwrap(),
2449 );
2450 let mut res = Vec::new();
2451 while let Some(page) = page_reader.get_next_page().unwrap() {
2452 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2453 }
2454 assert_eq!(
2455 res,
2456 vec![
2457 (PageType::DICTIONARY_PAGE, 10, 40),
2458 (PageType::DATA_PAGE, 9, 10),
2459 (PageType::DATA_PAGE, 1, 3),
2460 ]
2461 );
2462 assert_eq!(
2463 r.metadata.page_encoding_stats(),
2464 Some(&vec![
2465 PageEncodingStats {
2466 page_type: PageType::DICTIONARY_PAGE,
2467 encoding: Encoding::PLAIN,
2468 count: 1
2469 },
2470 PageEncodingStats {
2471 page_type: PageType::DATA_PAGE,
2472 encoding: Encoding::RLE_DICTIONARY,
2473 count: 2,
2474 }
2475 ])
2476 );
2477 }
2478
2479 #[test]
2480 fn test_bool_statistics() {
2481 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2482 assert!(!stats.is_min_max_backwards_compatible());
2485 if let Statistics::Boolean(stats) = stats {
2486 assert_eq!(stats.min_opt().unwrap(), &false);
2487 assert_eq!(stats.max_opt().unwrap(), &true);
2488 } else {
2489 panic!("expecting Statistics::Boolean, got {stats:?}");
2490 }
2491 }
2492
2493 #[test]
2494 fn test_int32_statistics() {
2495 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2496 assert!(stats.is_min_max_backwards_compatible());
2497 if let Statistics::Int32(stats) = stats {
2498 assert_eq!(stats.min_opt().unwrap(), &-2);
2499 assert_eq!(stats.max_opt().unwrap(), &3);
2500 } else {
2501 panic!("expecting Statistics::Int32, got {stats:?}");
2502 }
2503 }
2504
2505 #[test]
2506 fn test_int64_statistics() {
2507 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2508 assert!(stats.is_min_max_backwards_compatible());
2509 if let Statistics::Int64(stats) = stats {
2510 assert_eq!(stats.min_opt().unwrap(), &-2);
2511 assert_eq!(stats.max_opt().unwrap(), &3);
2512 } else {
2513 panic!("expecting Statistics::Int64, got {stats:?}");
2514 }
2515 }
2516
2517 #[test]
2518 fn test_int96_statistics() {
2519 let input = vec![
2520 Int96::from(vec![1, 20, 30]),
2521 Int96::from(vec![3, 20, 10]),
2522 Int96::from(vec![0, 20, 30]),
2523 Int96::from(vec![2, 20, 30]),
2524 ]
2525 .into_iter()
2526 .collect::<Vec<Int96>>();
2527
2528 let stats = statistics_roundtrip::<Int96Type>(&input);
2529 assert!(!stats.is_min_max_backwards_compatible());
2530 if let Statistics::Int96(stats) = stats {
2531 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2532 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![2, 20, 30]));
2533 } else {
2534 panic!("expecting Statistics::Int96, got {stats:?}");
2535 }
2536 }
2537
2538 #[test]
2539 fn test_float_statistics() {
2540 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2541 assert!(stats.is_min_max_backwards_compatible());
2542 if let Statistics::Float(stats) = stats {
2543 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2544 assert_eq!(stats.max_opt().unwrap(), &3.0);
2545 } else {
2546 panic!("expecting Statistics::Float, got {stats:?}");
2547 }
2548 }
2549
2550 #[test]
2551 fn test_double_statistics() {
2552 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2553 assert!(stats.is_min_max_backwards_compatible());
2554 if let Statistics::Double(stats) = stats {
2555 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2556 assert_eq!(stats.max_opt().unwrap(), &3.0);
2557 } else {
2558 panic!("expecting Statistics::Double, got {stats:?}");
2559 }
2560 }
2561
2562 #[test]
2563 fn test_byte_array_statistics() {
2564 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2565 .iter()
2566 .map(|&s| s.into())
2567 .collect::<Vec<_>>();
2568
2569 let stats = statistics_roundtrip::<ByteArrayType>(&input);
2570 assert!(!stats.is_min_max_backwards_compatible());
2571 if let Statistics::ByteArray(stats) = stats {
2572 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2573 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2574 } else {
2575 panic!("expecting Statistics::ByteArray, got {stats:?}");
2576 }
2577 }
2578
2579 #[test]
2580 fn test_fixed_len_byte_array_statistics() {
2581 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
2582 .iter()
2583 .map(|&s| ByteArray::from(s).into())
2584 .collect::<Vec<_>>();
2585
2586 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2587 assert!(!stats.is_min_max_backwards_compatible());
2588 if let Statistics::FixedLenByteArray(stats) = stats {
2589 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
2590 assert_eq!(stats.min_opt().unwrap(), &expected_min);
2591 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
2592 assert_eq!(stats.max_opt().unwrap(), &expected_max);
2593 } else {
2594 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2595 }
2596 }
2597
2598 #[test]
2599 fn test_column_writer_check_float16_min_max() {
2600 let input = [
2601 -f16::ONE,
2602 f16::from_f32(3.0),
2603 -f16::from_f32(2.0),
2604 f16::from_f32(2.0),
2605 ]
2606 .into_iter()
2607 .map(|s| ByteArray::from(s).into())
2608 .collect::<Vec<_>>();
2609
2610 let stats = float16_statistics_roundtrip(&input);
2611 assert!(stats.is_min_max_backwards_compatible());
2612 assert_eq!(
2613 stats.min_opt().unwrap(),
2614 &ByteArray::from(-f16::from_f32(2.0))
2615 );
2616 assert_eq!(
2617 stats.max_opt().unwrap(),
2618 &ByteArray::from(f16::from_f32(3.0))
2619 );
2620 }
2621
2622 #[test]
2623 fn test_column_writer_check_float16_nan_middle() {
2624 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2625 .into_iter()
2626 .map(|s| ByteArray::from(s).into())
2627 .collect::<Vec<_>>();
2628
2629 let stats = float16_statistics_roundtrip(&input);
2630 assert!(stats.is_min_max_backwards_compatible());
2631 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2632 assert_eq!(
2633 stats.max_opt().unwrap(),
2634 &ByteArray::from(f16::ONE + f16::ONE)
2635 );
2636 }
2637
2638 #[test]
2639 fn test_float16_statistics_nan_middle() {
2640 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2641 .into_iter()
2642 .map(|s| ByteArray::from(s).into())
2643 .collect::<Vec<_>>();
2644
2645 let stats = float16_statistics_roundtrip(&input);
2646 assert!(stats.is_min_max_backwards_compatible());
2647 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2648 assert_eq!(
2649 stats.max_opt().unwrap(),
2650 &ByteArray::from(f16::ONE + f16::ONE)
2651 );
2652 }
2653
2654 #[test]
2655 fn test_float16_statistics_nan_start() {
2656 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2657 .into_iter()
2658 .map(|s| ByteArray::from(s).into())
2659 .collect::<Vec<_>>();
2660
2661 let stats = float16_statistics_roundtrip(&input);
2662 assert!(stats.is_min_max_backwards_compatible());
2663 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2664 assert_eq!(
2665 stats.max_opt().unwrap(),
2666 &ByteArray::from(f16::ONE + f16::ONE)
2667 );
2668 }
2669
2670 #[test]
2671 fn test_float16_statistics_nan_only() {
2672 let input = [f16::NAN, f16::NAN]
2673 .into_iter()
2674 .map(|s| ByteArray::from(s).into())
2675 .collect::<Vec<_>>();
2676
2677 let stats = float16_statistics_roundtrip(&input);
2678 assert!(stats.min_bytes_opt().is_none());
2679 assert!(stats.max_bytes_opt().is_none());
2680 assert!(stats.is_min_max_backwards_compatible());
2681 }
2682
2683 #[test]
2684 fn test_float16_statistics_zero_only() {
2685 let input = [f16::ZERO]
2686 .into_iter()
2687 .map(|s| ByteArray::from(s).into())
2688 .collect::<Vec<_>>();
2689
2690 let stats = float16_statistics_roundtrip(&input);
2691 assert!(stats.is_min_max_backwards_compatible());
2692 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2693 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2694 }
2695
2696 #[test]
2697 fn test_float16_statistics_neg_zero_only() {
2698 let input = [f16::NEG_ZERO]
2699 .into_iter()
2700 .map(|s| ByteArray::from(s).into())
2701 .collect::<Vec<_>>();
2702
2703 let stats = float16_statistics_roundtrip(&input);
2704 assert!(stats.is_min_max_backwards_compatible());
2705 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2706 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2707 }
2708
2709 #[test]
2710 fn test_float16_statistics_zero_min() {
2711 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2712 .into_iter()
2713 .map(|s| ByteArray::from(s).into())
2714 .collect::<Vec<_>>();
2715
2716 let stats = float16_statistics_roundtrip(&input);
2717 assert!(stats.is_min_max_backwards_compatible());
2718 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2719 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2720 }
2721
2722 #[test]
2723 fn test_float16_statistics_neg_zero_max() {
2724 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2725 .into_iter()
2726 .map(|s| ByteArray::from(s).into())
2727 .collect::<Vec<_>>();
2728
2729 let stats = float16_statistics_roundtrip(&input);
2730 assert!(stats.is_min_max_backwards_compatible());
2731 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2732 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2733 }
2734
2735 #[test]
2736 fn test_float_statistics_nan_middle() {
2737 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2738 assert!(stats.is_min_max_backwards_compatible());
2739 if let Statistics::Float(stats) = stats {
2740 assert_eq!(stats.min_opt().unwrap(), &1.0);
2741 assert_eq!(stats.max_opt().unwrap(), &2.0);
2742 } else {
2743 panic!("expecting Statistics::Float");
2744 }
2745 }
2746
2747 #[test]
2748 fn test_float_statistics_nan_start() {
2749 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2750 assert!(stats.is_min_max_backwards_compatible());
2751 if let Statistics::Float(stats) = stats {
2752 assert_eq!(stats.min_opt().unwrap(), &1.0);
2753 assert_eq!(stats.max_opt().unwrap(), &2.0);
2754 } else {
2755 panic!("expecting Statistics::Float");
2756 }
2757 }
2758
2759 #[test]
2760 fn test_float_statistics_nan_only() {
2761 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2762 assert!(stats.min_bytes_opt().is_none());
2763 assert!(stats.max_bytes_opt().is_none());
2764 assert!(stats.is_min_max_backwards_compatible());
2765 assert!(matches!(stats, Statistics::Float(_)));
2766 }
2767
2768 #[test]
2769 fn test_float_statistics_zero_only() {
2770 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2771 assert!(stats.is_min_max_backwards_compatible());
2772 if let Statistics::Float(stats) = stats {
2773 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2774 assert!(stats.min_opt().unwrap().is_sign_negative());
2775 assert_eq!(stats.max_opt().unwrap(), &0.0);
2776 assert!(stats.max_opt().unwrap().is_sign_positive());
2777 } else {
2778 panic!("expecting Statistics::Float");
2779 }
2780 }
2781
2782 #[test]
2783 fn test_float_statistics_neg_zero_only() {
2784 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2785 assert!(stats.is_min_max_backwards_compatible());
2786 if let Statistics::Float(stats) = stats {
2787 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2788 assert!(stats.min_opt().unwrap().is_sign_negative());
2789 assert_eq!(stats.max_opt().unwrap(), &0.0);
2790 assert!(stats.max_opt().unwrap().is_sign_positive());
2791 } else {
2792 panic!("expecting Statistics::Float");
2793 }
2794 }
2795
2796 #[test]
2797 fn test_float_statistics_zero_min() {
2798 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2799 assert!(stats.is_min_max_backwards_compatible());
2800 if let Statistics::Float(stats) = stats {
2801 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2802 assert!(stats.min_opt().unwrap().is_sign_negative());
2803 assert_eq!(stats.max_opt().unwrap(), &2.0);
2804 } else {
2805 panic!("expecting Statistics::Float");
2806 }
2807 }
2808
2809 #[test]
2810 fn test_float_statistics_neg_zero_max() {
2811 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2812 assert!(stats.is_min_max_backwards_compatible());
2813 if let Statistics::Float(stats) = stats {
2814 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2815 assert_eq!(stats.max_opt().unwrap(), &0.0);
2816 assert!(stats.max_opt().unwrap().is_sign_positive());
2817 } else {
2818 panic!("expecting Statistics::Float");
2819 }
2820 }
2821
2822 #[test]
2823 fn test_double_statistics_nan_middle() {
2824 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2825 assert!(stats.is_min_max_backwards_compatible());
2826 if let Statistics::Double(stats) = stats {
2827 assert_eq!(stats.min_opt().unwrap(), &1.0);
2828 assert_eq!(stats.max_opt().unwrap(), &2.0);
2829 } else {
2830 panic!("expecting Statistics::Double");
2831 }
2832 }
2833
2834 #[test]
2835 fn test_double_statistics_nan_start() {
2836 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2837 assert!(stats.is_min_max_backwards_compatible());
2838 if let Statistics::Double(stats) = stats {
2839 assert_eq!(stats.min_opt().unwrap(), &1.0);
2840 assert_eq!(stats.max_opt().unwrap(), &2.0);
2841 } else {
2842 panic!("expecting Statistics::Double");
2843 }
2844 }
2845
2846 #[test]
2847 fn test_double_statistics_nan_only() {
2848 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2849 assert!(stats.min_bytes_opt().is_none());
2850 assert!(stats.max_bytes_opt().is_none());
2851 assert!(matches!(stats, Statistics::Double(_)));
2852 assert!(stats.is_min_max_backwards_compatible());
2853 }
2854
2855 #[test]
2856 fn test_double_statistics_zero_only() {
2857 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2858 assert!(stats.is_min_max_backwards_compatible());
2859 if let Statistics::Double(stats) = stats {
2860 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2861 assert!(stats.min_opt().unwrap().is_sign_negative());
2862 assert_eq!(stats.max_opt().unwrap(), &0.0);
2863 assert!(stats.max_opt().unwrap().is_sign_positive());
2864 } else {
2865 panic!("expecting Statistics::Double");
2866 }
2867 }
2868
2869 #[test]
2870 fn test_double_statistics_neg_zero_only() {
2871 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2872 assert!(stats.is_min_max_backwards_compatible());
2873 if let Statistics::Double(stats) = stats {
2874 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2875 assert!(stats.min_opt().unwrap().is_sign_negative());
2876 assert_eq!(stats.max_opt().unwrap(), &0.0);
2877 assert!(stats.max_opt().unwrap().is_sign_positive());
2878 } else {
2879 panic!("expecting Statistics::Double");
2880 }
2881 }
2882
2883 #[test]
2884 fn test_double_statistics_zero_min() {
2885 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2886 assert!(stats.is_min_max_backwards_compatible());
2887 if let Statistics::Double(stats) = stats {
2888 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2889 assert!(stats.min_opt().unwrap().is_sign_negative());
2890 assert_eq!(stats.max_opt().unwrap(), &2.0);
2891 } else {
2892 panic!("expecting Statistics::Double");
2893 }
2894 }
2895
2896 #[test]
2897 fn test_double_statistics_neg_zero_max() {
2898 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2899 assert!(stats.is_min_max_backwards_compatible());
2900 if let Statistics::Double(stats) = stats {
2901 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2902 assert_eq!(stats.max_opt().unwrap(), &0.0);
2903 assert!(stats.max_opt().unwrap().is_sign_positive());
2904 } else {
2905 panic!("expecting Statistics::Double");
2906 }
2907 }
2908
2909 #[test]
2910 fn test_compare_greater_byte_array_decimals() {
2911 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2912 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2913 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2914 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2915 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2916 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2917 assert!(!compare_greater_byte_array_decimals(
2918 &[0u8, 1u8,],
2919 &[1u8, 0u8,],
2920 ),);
2921 assert!(!compare_greater_byte_array_decimals(
2922 &[255u8, 35u8, 0u8, 0u8,],
2923 &[0u8,],
2924 ),);
2925 assert!(compare_greater_byte_array_decimals(
2926 &[0u8,],
2927 &[255u8, 35u8, 0u8, 0u8,],
2928 ),);
2929 }
2930
2931 #[test]
2932 fn test_column_index_with_null_pages() {
2933 let page_writer = get_test_page_writer();
2935 let props = Default::default();
2936 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2937 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2938
2939 let r = writer.close().unwrap();
2940 assert!(r.column_index.is_some());
2941 let col_idx = r.column_index.unwrap();
2942 assert!(col_idx.null_pages[0]);
2944 assert_eq!(col_idx.min_values[0].len(), 0);
2946 assert_eq!(col_idx.max_values[0].len(), 0);
2947 assert!(col_idx.null_counts.is_some());
2949 assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2950 assert!(col_idx.repetition_level_histograms.is_none());
2952 assert!(col_idx.definition_level_histograms.is_some());
2954 assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2955 }
2956
2957 #[test]
2958 fn test_column_offset_index_metadata() {
2959 let page_writer = get_test_page_writer();
2962 let props = Default::default();
2963 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2964 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2965 writer.flush_data_pages().unwrap();
2967 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2969
2970 let r = writer.close().unwrap();
2971 let column_index = r.column_index.unwrap();
2972 let offset_index = r.offset_index.unwrap();
2973
2974 assert_eq!(8, r.rows_written);
2975
2976 assert_eq!(2, column_index.null_pages.len());
2978 assert_eq!(2, offset_index.page_locations.len());
2979 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2980 for idx in 0..2 {
2981 assert!(!column_index.null_pages[idx]);
2982 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2983 }
2984
2985 if let Some(stats) = r.metadata.statistics() {
2986 assert_eq!(stats.null_count_opt(), Some(0));
2987 assert_eq!(stats.distinct_count_opt(), None);
2988 if let Statistics::Int32(stats) = stats {
2989 assert_eq!(
2993 stats.min_bytes_opt(),
2994 Some(column_index.min_values[1].as_slice())
2995 );
2996 assert_eq!(
2997 stats.max_bytes_opt(),
2998 column_index.max_values.get(1).map(Vec::as_slice)
2999 );
3000 } else {
3001 panic!("expecting Statistics::Int32");
3002 }
3003 } else {
3004 panic!("metadata missing statistics");
3005 }
3006
3007 assert_eq!(0, offset_index.page_locations[0].first_row_index);
3009 assert_eq!(4, offset_index.page_locations[1].first_row_index);
3010 }
3011
3012 #[test]
3014 fn test_column_offset_index_metadata_truncating() {
3015 let page_writer = get_test_page_writer();
3018 let props = WriterProperties::builder()
3019 .set_statistics_truncate_length(None) .build()
3021 .into();
3022 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3023
3024 let mut data = vec![FixedLenByteArray::default(); 3];
3025 data[0].set_data(Bytes::from(vec![97_u8; 200]));
3027 data[1].set_data(Bytes::from(vec![112_u8; 200]));
3029 data[2].set_data(Bytes::from(vec![98_u8; 200]));
3030
3031 writer.write_batch(&data, None, None).unwrap();
3032
3033 writer.flush_data_pages().unwrap();
3034
3035 let r = writer.close().unwrap();
3036 let column_index = r.column_index.unwrap();
3037 let offset_index = r.offset_index.unwrap();
3038
3039 assert_eq!(3, r.rows_written);
3040
3041 assert_eq!(1, column_index.null_pages.len());
3043 assert_eq!(1, offset_index.page_locations.len());
3044 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3045 assert!(!column_index.null_pages[0]);
3046 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3047
3048 if let Some(stats) = r.metadata.statistics() {
3049 assert_eq!(stats.null_count_opt(), Some(0));
3050 assert_eq!(stats.distinct_count_opt(), None);
3051 if let Statistics::FixedLenByteArray(stats) = stats {
3052 let column_index_min_value = &column_index.min_values[0];
3053 let column_index_max_value = &column_index.max_values[0];
3054
3055 assert_ne!(
3057 stats.min_bytes_opt(),
3058 Some(column_index_min_value.as_slice())
3059 );
3060 assert_ne!(
3061 stats.max_bytes_opt(),
3062 Some(column_index_max_value.as_slice())
3063 );
3064
3065 assert_eq!(
3066 column_index_min_value.len(),
3067 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3068 );
3069 assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
3070 assert_eq!(
3071 column_index_max_value.len(),
3072 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3073 );
3074
3075 assert_eq!(
3077 *column_index_max_value.last().unwrap(),
3078 *column_index_max_value.first().unwrap() + 1
3079 );
3080 } else {
3081 panic!("expecting Statistics::FixedLenByteArray");
3082 }
3083 } else {
3084 panic!("metadata missing statistics");
3085 }
3086 }
3087
3088 #[test]
3089 fn test_column_offset_index_truncating_spec_example() {
3090 let page_writer = get_test_page_writer();
3093
3094 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3096 let props = Arc::new(builder.build());
3097 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3098
3099 let mut data = vec![FixedLenByteArray::default(); 1];
3100 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3102
3103 writer.write_batch(&data, None, None).unwrap();
3104
3105 writer.flush_data_pages().unwrap();
3106
3107 let r = writer.close().unwrap();
3108 let column_index = r.column_index.unwrap();
3109 let offset_index = r.offset_index.unwrap();
3110
3111 assert_eq!(1, r.rows_written);
3112
3113 assert_eq!(1, column_index.null_pages.len());
3115 assert_eq!(1, offset_index.page_locations.len());
3116 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3117 assert!(!column_index.null_pages[0]);
3118 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3119
3120 if let Some(stats) = r.metadata.statistics() {
3121 assert_eq!(stats.null_count_opt(), Some(0));
3122 assert_eq!(stats.distinct_count_opt(), None);
3123 if let Statistics::FixedLenByteArray(_stats) = stats {
3124 let column_index_min_value = &column_index.min_values[0];
3125 let column_index_max_value = &column_index.max_values[0];
3126
3127 assert_eq!(column_index_min_value.len(), 1);
3128 assert_eq!(column_index_max_value.len(), 1);
3129
3130 assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
3131 assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
3132
3133 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3134 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3135 } else {
3136 panic!("expecting Statistics::FixedLenByteArray");
3137 }
3138 } else {
3139 panic!("metadata missing statistics");
3140 }
3141 }
3142
3143 #[test]
3144 fn test_float16_min_max_no_truncation() {
3145 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3147 let props = Arc::new(builder.build());
3148 let page_writer = get_test_page_writer();
3149 let mut writer = get_test_float16_column_writer(page_writer, props);
3150
3151 let expected_value = f16::PI.to_le_bytes().to_vec();
3152 let data = vec![ByteArray::from(expected_value.clone()).into()];
3153 writer.write_batch(&data, None, None).unwrap();
3154 writer.flush_data_pages().unwrap();
3155
3156 let r = writer.close().unwrap();
3157
3158 let column_index = r.column_index.unwrap();
3161 let column_index_min_bytes = column_index.min_values[0].as_slice();
3162 let column_index_max_bytes = column_index.max_values[0].as_slice();
3163 assert_eq!(expected_value, column_index_min_bytes);
3164 assert_eq!(expected_value, column_index_max_bytes);
3165
3166 let stats = r.metadata.statistics().unwrap();
3168 if let Statistics::FixedLenByteArray(stats) = stats {
3169 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3170 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3171 assert_eq!(expected_value, stats_min_bytes);
3172 assert_eq!(expected_value, stats_max_bytes);
3173 } else {
3174 panic!("expecting Statistics::FixedLenByteArray");
3175 }
3176 }
3177
3178 #[test]
3179 fn test_decimal_min_max_no_truncation() {
3180 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3182 let props = Arc::new(builder.build());
3183 let page_writer = get_test_page_writer();
3184 let mut writer =
3185 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3186
3187 let expected_value = vec![
3188 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3189 231u8, 90u8, 0u8, 0u8,
3190 ];
3191 let data = vec![ByteArray::from(expected_value.clone()).into()];
3192 writer.write_batch(&data, None, None).unwrap();
3193 writer.flush_data_pages().unwrap();
3194
3195 let r = writer.close().unwrap();
3196
3197 let column_index = r.column_index.unwrap();
3200 let column_index_min_bytes = column_index.min_values[0].as_slice();
3201 let column_index_max_bytes = column_index.max_values[0].as_slice();
3202 assert_eq!(expected_value, column_index_min_bytes);
3203 assert_eq!(expected_value, column_index_max_bytes);
3204
3205 let stats = r.metadata.statistics().unwrap();
3207 if let Statistics::FixedLenByteArray(stats) = stats {
3208 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3209 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3210 assert_eq!(expected_value, stats_min_bytes);
3211 assert_eq!(expected_value, stats_max_bytes);
3212 } else {
3213 panic!("expecting Statistics::FixedLenByteArray");
3214 }
3215 }
3216
3217 #[test]
3218 fn test_statistics_truncating_byte_array_default() {
3219 let page_writer = get_test_page_writer();
3220
3221 let props = WriterProperties::builder().build().into();
3223 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3224
3225 let mut data = vec![ByteArray::default(); 1];
3226 data[0].set_data(Bytes::from(String::from(
3227 "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3228 )));
3229 writer.write_batch(&data, None, None).unwrap();
3230 writer.flush_data_pages().unwrap();
3231
3232 let r = writer.close().unwrap();
3233
3234 assert_eq!(1, r.rows_written);
3235
3236 let stats = r.metadata.statistics().expect("statistics");
3237 if let Statistics::ByteArray(_stats) = stats {
3238 let min_value = _stats.min_opt().unwrap();
3239 let max_value = _stats.max_opt().unwrap();
3240
3241 assert!(!_stats.min_is_exact());
3242 assert!(!_stats.max_is_exact());
3243
3244 let expected_len = 64;
3245 assert_eq!(min_value.len(), expected_len);
3246 assert_eq!(max_value.len(), expected_len);
3247
3248 let expected_min =
3249 "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3250 assert_eq!(expected_min, min_value.as_bytes());
3251 let expected_max =
3253 "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3254 assert_eq!(expected_max, max_value.as_bytes());
3255 } else {
3256 panic!("expecting Statistics::ByteArray");
3257 }
3258 }
3259
3260 #[test]
3261 fn test_statistics_truncating_byte_array() {
3262 let page_writer = get_test_page_writer();
3263
3264 const TEST_TRUNCATE_LENGTH: usize = 1;
3265
3266 let builder =
3268 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3269 let props = Arc::new(builder.build());
3270 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3271
3272 let mut data = vec![ByteArray::default(); 1];
3273 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3275
3276 writer.write_batch(&data, None, None).unwrap();
3277
3278 writer.flush_data_pages().unwrap();
3279
3280 let r = writer.close().unwrap();
3281
3282 assert_eq!(1, r.rows_written);
3283
3284 let stats = r.metadata.statistics().expect("statistics");
3285 assert_eq!(stats.null_count_opt(), Some(0));
3286 assert_eq!(stats.distinct_count_opt(), None);
3287 if let Statistics::ByteArray(_stats) = stats {
3288 let min_value = _stats.min_opt().unwrap();
3289 let max_value = _stats.max_opt().unwrap();
3290
3291 assert!(!_stats.min_is_exact());
3292 assert!(!_stats.max_is_exact());
3293
3294 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3295 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3296
3297 assert_eq!("B".as_bytes(), min_value.as_bytes());
3298 assert_eq!("C".as_bytes(), max_value.as_bytes());
3299 } else {
3300 panic!("expecting Statistics::ByteArray");
3301 }
3302 }
3303
3304 #[test]
3305 fn test_statistics_truncating_fixed_len_byte_array() {
3306 let page_writer = get_test_page_writer();
3307
3308 const TEST_TRUNCATE_LENGTH: usize = 1;
3309
3310 let builder =
3312 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3313 let props = Arc::new(builder.build());
3314 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3315
3316 let mut data = vec![FixedLenByteArray::default(); 1];
3317
3318 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3319 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3320
3321 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3323 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3324
3325 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3327
3328 writer.write_batch(&data, None, None).unwrap();
3329
3330 writer.flush_data_pages().unwrap();
3331
3332 let r = writer.close().unwrap();
3333
3334 assert_eq!(1, r.rows_written);
3335
3336 let stats = r.metadata.statistics().expect("statistics");
3337 assert_eq!(stats.null_count_opt(), Some(0));
3338 assert_eq!(stats.distinct_count_opt(), None);
3339 if let Statistics::FixedLenByteArray(_stats) = stats {
3340 let min_value = _stats.min_opt().unwrap();
3341 let max_value = _stats.max_opt().unwrap();
3342
3343 assert!(!_stats.min_is_exact());
3344 assert!(!_stats.max_is_exact());
3345
3346 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3347 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3348
3349 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3350 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3351
3352 let reconstructed_min = i128::from_be_bytes([
3353 min_value.as_bytes()[0],
3354 0,
3355 0,
3356 0,
3357 0,
3358 0,
3359 0,
3360 0,
3361 0,
3362 0,
3363 0,
3364 0,
3365 0,
3366 0,
3367 0,
3368 0,
3369 ]);
3370
3371 let reconstructed_max = i128::from_be_bytes([
3372 max_value.as_bytes()[0],
3373 0,
3374 0,
3375 0,
3376 0,
3377 0,
3378 0,
3379 0,
3380 0,
3381 0,
3382 0,
3383 0,
3384 0,
3385 0,
3386 0,
3387 0,
3388 ]);
3389
3390 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3392 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3393 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3394 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3395 } else {
3396 panic!("expecting Statistics::FixedLenByteArray");
3397 }
3398 }
3399
3400 #[test]
3401 fn test_send() {
3402 fn test<T: Send>() {}
3403 test::<ColumnWriterImpl<Int32Type>>();
3404 }
3405
3406 #[test]
3407 fn test_increment() {
3408 let v = increment(vec![0, 0, 0]).unwrap();
3409 assert_eq!(&v, &[0, 0, 1]);
3410
3411 let v = increment(vec![0, 255, 255]).unwrap();
3413 assert_eq!(&v, &[1, 0, 0]);
3414
3415 let v = increment(vec![255, 255, 255]);
3417 assert!(v.is_none());
3418 }
3419
3420 #[test]
3421 fn test_increment_utf8() {
3422 let test_inc = |o: &str, expected: &str| {
3423 if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3424 assert_eq!(v, expected);
3426 assert!(*v > *o);
3428 let mut greater = ByteArray::new();
3430 greater.set_data(Bytes::from(v));
3431 let mut original = ByteArray::new();
3432 original.set_data(Bytes::from(o.as_bytes().to_vec()));
3433 assert!(greater > original);
3434 } else {
3435 panic!("Expected incremented UTF8 string to also be valid.");
3436 }
3437 };
3438
3439 test_inc("hello", "hellp");
3441
3442 test_inc("a\u{7f}", "b");
3444
3445 assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3447
3448 test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3450
3451 test_inc("éééé", "éééê");
3453
3454 test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3456
3457 test_inc("a\u{7ff}", "b");
3459
3460 assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3462
3463 test_inc("ࠀࠀ", "ࠀࠁ");
3466
3467 test_inc("a\u{ffff}", "b");
3469
3470 assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3472
3473 test_inc("𐀀𐀀", "𐀀𐀁");
3475
3476 test_inc("a\u{10ffff}", "b");
3478
3479 assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3481
3482 test_inc("a\u{D7FF}", "b");
3485 }
3486
3487 #[test]
3488 fn test_truncate_utf8() {
3489 let data = "❤️🧡💛💚💙💜";
3491 let r = truncate_utf8(data, data.len()).unwrap();
3492 assert_eq!(r.len(), data.len());
3493 assert_eq!(&r, data.as_bytes());
3494
3495 let r = truncate_utf8(data, 13).unwrap();
3497 assert_eq!(r.len(), 10);
3498 assert_eq!(&r, "❤️🧡".as_bytes());
3499
3500 let r = truncate_utf8("\u{0836}", 1);
3502 assert!(r.is_none());
3503
3504 let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3507 assert_eq!(&r, "yyyyyyyz".as_bytes());
3508
3509 let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3511 assert_eq!(&r, "ééê".as_bytes());
3512
3513 let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3515 assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3516
3517 let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3519 assert!(r.is_none());
3520
3521 let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3524 assert_eq!(&r, "ࠀࠁ".as_bytes());
3525
3526 let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3528 assert!(r.is_none());
3529
3530 let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3532 assert_eq!(&r, "𐀀𐀁".as_bytes());
3533
3534 let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3536 assert!(r.is_none());
3537 }
3538
3539 #[test]
3540 fn test_byte_array_truncate_invalid_utf8_statistics() {
3543 let message_type = "
3544 message test_schema {
3545 OPTIONAL BYTE_ARRAY a (UTF8);
3546 }
3547 ";
3548 let schema = Arc::new(parse_message_type(message_type).unwrap());
3549
3550 let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3552 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3553 let file: File = tempfile::tempfile().unwrap();
3554 let props = Arc::new(
3555 WriterProperties::builder()
3556 .set_statistics_enabled(EnabledStatistics::Chunk)
3557 .set_statistics_truncate_length(Some(8))
3558 .build(),
3559 );
3560
3561 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3562 let mut row_group_writer = writer.next_row_group().unwrap();
3563
3564 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3565 col_writer
3566 .typed::<ByteArrayType>()
3567 .write_batch(&data, Some(&def_levels), None)
3568 .unwrap();
3569 col_writer.close().unwrap();
3570 row_group_writer.close().unwrap();
3571 let file_metadata = writer.close().unwrap();
3572 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
3573 let stats = file_metadata.row_groups[0].columns[0]
3574 .meta_data
3575 .as_ref()
3576 .unwrap()
3577 .statistics
3578 .as_ref()
3579 .unwrap();
3580 assert!(!stats.is_max_value_exact.unwrap());
3581 assert_eq!(
3584 stats.max_value,
3585 Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3586 );
3587 }
3588
3589 #[test]
3590 fn test_increment_max_binary_chars() {
3591 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3592 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3593
3594 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3595 assert!(incremented.is_none())
3596 }
3597
3598 #[test]
3599 fn test_no_column_index_when_stats_disabled() {
3600 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3604 let props = Arc::new(
3605 WriterProperties::builder()
3606 .set_statistics_enabled(EnabledStatistics::None)
3607 .build(),
3608 );
3609 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3610 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3611
3612 let data = Vec::new();
3613 let def_levels = vec![0; 10];
3614 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3615 writer.flush_data_pages().unwrap();
3616
3617 let column_close_result = writer.close().unwrap();
3618 assert!(column_close_result.offset_index.is_some());
3619 assert!(column_close_result.column_index.is_none());
3620 }
3621
3622 #[test]
3623 fn test_no_offset_index_when_disabled() {
3624 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3626 let props = Arc::new(
3627 WriterProperties::builder()
3628 .set_statistics_enabled(EnabledStatistics::None)
3629 .set_offset_index_disabled(true)
3630 .build(),
3631 );
3632 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3633 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3634
3635 let data = Vec::new();
3636 let def_levels = vec![0; 10];
3637 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3638 writer.flush_data_pages().unwrap();
3639
3640 let column_close_result = writer.close().unwrap();
3641 assert!(column_close_result.offset_index.is_none());
3642 assert!(column_close_result.column_index.is_none());
3643 }
3644
3645 #[test]
3646 fn test_offset_index_overridden() {
3647 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3649 let props = Arc::new(
3650 WriterProperties::builder()
3651 .set_statistics_enabled(EnabledStatistics::Page)
3652 .set_offset_index_disabled(true)
3653 .build(),
3654 );
3655 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3656 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3657
3658 let data = Vec::new();
3659 let def_levels = vec![0; 10];
3660 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3661 writer.flush_data_pages().unwrap();
3662
3663 let column_close_result = writer.close().unwrap();
3664 assert!(column_close_result.offset_index.is_some());
3665 assert!(column_close_result.column_index.is_some());
3666 }
3667
3668 #[test]
3669 fn test_boundary_order() -> Result<()> {
3670 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3671 let column_close_result = write_multiple_pages::<Int32Type>(
3673 &descr,
3674 &[
3675 &[Some(-10), Some(10)],
3676 &[Some(-5), Some(11)],
3677 &[None],
3678 &[Some(-5), Some(11)],
3679 ],
3680 )?;
3681 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3682 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3683
3684 let column_close_result = write_multiple_pages::<Int32Type>(
3686 &descr,
3687 &[
3688 &[Some(10), Some(11)],
3689 &[Some(5), Some(11)],
3690 &[None],
3691 &[Some(-5), Some(0)],
3692 ],
3693 )?;
3694 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3695 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3696
3697 let column_close_result = write_multiple_pages::<Int32Type>(
3699 &descr,
3700 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3701 )?;
3702 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3703 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3704
3705 let column_close_result =
3707 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3708 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3709 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3710
3711 let column_close_result =
3713 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3714 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3715 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3716
3717 let column_close_result =
3719 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3720 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3721 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3722
3723 let column_close_result = write_multiple_pages::<Int32Type>(
3725 &descr,
3726 &[
3727 &[Some(10), Some(11)],
3728 &[Some(11), Some(16)],
3729 &[None],
3730 &[Some(-5), Some(0)],
3731 ],
3732 )?;
3733 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3734 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3735
3736 let column_close_result = write_multiple_pages::<Int32Type>(
3738 &descr,
3739 &[
3740 &[Some(1), Some(9)],
3741 &[Some(2), Some(8)],
3742 &[None],
3743 &[Some(3), Some(7)],
3744 ],
3745 )?;
3746 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3747 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3748
3749 Ok(())
3750 }
3751
3752 #[test]
3753 fn test_boundary_order_logical_type() -> Result<()> {
3754 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3757 let fba_descr = {
3758 let tpe = SchemaType::primitive_type_builder(
3759 "col",
3760 FixedLenByteArrayType::get_physical_type(),
3761 )
3762 .with_length(2)
3763 .build()?;
3764 Arc::new(ColumnDescriptor::new(
3765 Arc::new(tpe),
3766 1,
3767 0,
3768 ColumnPath::from("col"),
3769 ))
3770 };
3771
3772 let values: &[&[Option<FixedLenByteArray>]] = &[
3773 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3774 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3775 &[Some(FixedLenByteArray::from(ByteArray::from(
3776 f16::NEG_ZERO,
3777 )))],
3778 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3779 ];
3780
3781 let column_close_result =
3783 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3784 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3785 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3786
3787 let column_close_result =
3789 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3790 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3791 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3792
3793 Ok(())
3794 }
3795
3796 #[test]
3797 fn test_interval_stats_should_not_have_min_max() {
3798 let input = [
3799 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3800 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3801 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3802 ]
3803 .into_iter()
3804 .map(|s| ByteArray::from(s).into())
3805 .collect::<Vec<_>>();
3806
3807 let page_writer = get_test_page_writer();
3808 let mut writer = get_test_interval_column_writer(page_writer);
3809 writer.write_batch(&input, None, None).unwrap();
3810
3811 let metadata = writer.close().unwrap().metadata;
3812 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3813 stats.clone()
3814 } else {
3815 panic!("metadata missing statistics");
3816 };
3817 assert!(stats.min_bytes_opt().is_none());
3818 assert!(stats.max_bytes_opt().is_none());
3819 }
3820
3821 #[test]
3822 #[cfg(feature = "arrow")]
3823 fn test_column_writer_get_estimated_total_bytes() {
3824 let page_writer = get_test_page_writer();
3825 let props = Default::default();
3826 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3827 assert_eq!(writer.get_estimated_total_bytes(), 0);
3828
3829 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3830 writer.add_data_page().unwrap();
3831 let size_with_one_page = writer.get_estimated_total_bytes();
3832 assert_eq!(size_with_one_page, 20);
3833
3834 writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3835 writer.add_data_page().unwrap();
3836 let size_with_two_pages = writer.get_estimated_total_bytes();
3837 assert_eq!(size_with_two_pages, 20 + 21);
3839 }
3840
3841 fn write_multiple_pages<T: DataType>(
3842 column_descr: &Arc<ColumnDescriptor>,
3843 pages: &[&[Option<T::T>]],
3844 ) -> Result<ColumnCloseResult> {
3845 let column_writer = get_column_writer(
3846 column_descr.clone(),
3847 Default::default(),
3848 get_test_page_writer(),
3849 );
3850 let mut writer = get_typed_column_writer::<T>(column_writer);
3851
3852 for &page in pages {
3853 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3854 let def_levels = page
3855 .iter()
3856 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3857 .collect::<Vec<_>>();
3858 writer.write_batch(&values, Some(&def_levels), None)?;
3859 writer.flush_data_pages()?;
3860 }
3861
3862 writer.close()
3863 }
3864
3865 fn column_roundtrip_random<T: DataType>(
3869 props: WriterProperties,
3870 max_size: usize,
3871 min_value: T::T,
3872 max_value: T::T,
3873 max_def_level: i16,
3874 max_rep_level: i16,
3875 ) where
3876 T::T: PartialOrd + SampleUniform + Copy,
3877 {
3878 let mut num_values: usize = 0;
3879
3880 let mut buf: Vec<i16> = Vec::new();
3881 let def_levels = if max_def_level > 0 {
3882 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3883 for &dl in &buf[..] {
3884 if dl == max_def_level {
3885 num_values += 1;
3886 }
3887 }
3888 Some(&buf[..])
3889 } else {
3890 num_values = max_size;
3891 None
3892 };
3893
3894 let mut buf: Vec<i16> = Vec::new();
3895 let rep_levels = if max_rep_level > 0 {
3896 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3897 buf[0] = 0; Some(&buf[..])
3899 } else {
3900 None
3901 };
3902
3903 let mut values: Vec<T::T> = Vec::new();
3904 random_numbers_range(num_values, min_value, max_value, &mut values);
3905
3906 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3907 }
3908
3909 fn column_roundtrip<T: DataType>(
3911 props: WriterProperties,
3912 values: &[T::T],
3913 def_levels: Option<&[i16]>,
3914 rep_levels: Option<&[i16]>,
3915 ) {
3916 let mut file = tempfile::tempfile().unwrap();
3917 let mut write = TrackedWrite::new(&mut file);
3918 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3919
3920 let max_def_level = match def_levels {
3921 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3922 None => 0i16,
3923 };
3924
3925 let max_rep_level = match rep_levels {
3926 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3927 None => 0i16,
3928 };
3929
3930 let mut max_batch_size = values.len();
3931 if let Some(levels) = def_levels {
3932 max_batch_size = max_batch_size.max(levels.len());
3933 }
3934 if let Some(levels) = rep_levels {
3935 max_batch_size = max_batch_size.max(levels.len());
3936 }
3937
3938 let mut writer =
3939 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3940
3941 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3942 assert_eq!(values_written, values.len());
3943 let result = writer.close().unwrap();
3944
3945 drop(write);
3946
3947 let props = ReaderProperties::builder()
3948 .set_backward_compatible_lz4(false)
3949 .build();
3950 let page_reader = Box::new(
3951 SerializedPageReader::new_with_properties(
3952 Arc::new(file),
3953 &result.metadata,
3954 result.rows_written as usize,
3955 None,
3956 Arc::new(props),
3957 )
3958 .unwrap(),
3959 );
3960 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3961
3962 let mut actual_values = Vec::with_capacity(max_batch_size);
3963 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3964 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3965
3966 let (_, values_read, levels_read) = reader
3967 .read_records(
3968 max_batch_size,
3969 actual_def_levels.as_mut(),
3970 actual_rep_levels.as_mut(),
3971 &mut actual_values,
3972 )
3973 .unwrap();
3974
3975 assert_eq!(&actual_values[..values_read], values);
3978 match actual_def_levels {
3979 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3980 None => assert_eq!(None, def_levels),
3981 }
3982 match actual_rep_levels {
3983 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3984 None => assert_eq!(None, rep_levels),
3985 }
3986
3987 if let Some(levels) = actual_rep_levels {
3990 let mut actual_rows_written = 0;
3991 for l in levels {
3992 if l == 0 {
3993 actual_rows_written += 1;
3994 }
3995 }
3996 assert_eq!(actual_rows_written, result.rows_written);
3997 } else if actual_def_levels.is_some() {
3998 assert_eq!(levels_read as u64, result.rows_written);
3999 } else {
4000 assert_eq!(values_read as u64, result.rows_written);
4001 }
4002 }
4003
4004 fn column_write_and_get_metadata<T: DataType>(
4007 props: WriterProperties,
4008 values: &[T::T],
4009 ) -> ColumnChunkMetaData {
4010 let page_writer = get_test_page_writer();
4011 let props = Arc::new(props);
4012 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4013 writer.write_batch(values, None, None).unwrap();
4014 writer.close().unwrap().metadata
4015 }
4016
4017 fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4019 PageEncodingStats {
4020 page_type,
4021 encoding,
4022 count,
4023 }
4024 }
4025
4026 fn check_encoding_write_support<T: DataType>(
4030 version: WriterVersion,
4031 dict_enabled: bool,
4032 data: &[T::T],
4033 dictionary_page_offset: Option<i64>,
4034 encodings: &[Encoding],
4035 page_encoding_stats: &[PageEncodingStats],
4036 ) {
4037 let props = WriterProperties::builder()
4038 .set_writer_version(version)
4039 .set_dictionary_enabled(dict_enabled)
4040 .build();
4041 let meta = column_write_and_get_metadata::<T>(props, data);
4042 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4043 assert_eq!(meta.encodings(), encodings);
4044 assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4045 }
4046
4047 fn get_test_column_writer<'a, T: DataType>(
4049 page_writer: Box<dyn PageWriter + 'a>,
4050 max_def_level: i16,
4051 max_rep_level: i16,
4052 props: WriterPropertiesPtr,
4053 ) -> ColumnWriterImpl<'a, T> {
4054 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4055 let column_writer = get_column_writer(descr, props, page_writer);
4056 get_typed_column_writer::<T>(column_writer)
4057 }
4058
4059 fn get_test_column_reader<T: DataType>(
4061 page_reader: Box<dyn PageReader>,
4062 max_def_level: i16,
4063 max_rep_level: i16,
4064 ) -> ColumnReaderImpl<T> {
4065 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4066 let column_reader = get_column_reader(descr, page_reader);
4067 get_typed_column_reader::<T>(column_reader)
4068 }
4069
4070 fn get_test_column_descr<T: DataType>(
4072 max_def_level: i16,
4073 max_rep_level: i16,
4074 ) -> ColumnDescriptor {
4075 let path = ColumnPath::from("col");
4076 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4077 .with_length(1)
4080 .build()
4081 .unwrap();
4082 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4083 }
4084
4085 fn get_test_page_writer() -> Box<dyn PageWriter> {
4087 Box::new(TestPageWriter {})
4088 }
4089
4090 struct TestPageWriter {}
4091
4092 impl PageWriter for TestPageWriter {
4093 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4094 let mut res = PageWriteSpec::new();
4095 res.page_type = page.page_type();
4096 res.uncompressed_size = page.uncompressed_size();
4097 res.compressed_size = page.compressed_size();
4098 res.num_values = page.num_values();
4099 res.offset = 0;
4100 res.bytes_written = page.data().len() as u64;
4101 Ok(res)
4102 }
4103
4104 fn close(&mut self) -> Result<()> {
4105 Ok(())
4106 }
4107 }
4108
4109 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4111 let page_writer = get_test_page_writer();
4112 let props = Default::default();
4113 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4114 writer.write_batch(values, None, None).unwrap();
4115
4116 let metadata = writer.close().unwrap().metadata;
4117 if let Some(stats) = metadata.statistics() {
4118 stats.clone()
4119 } else {
4120 panic!("metadata missing statistics");
4121 }
4122 }
4123
4124 fn get_test_decimals_column_writer<T: DataType>(
4126 page_writer: Box<dyn PageWriter>,
4127 max_def_level: i16,
4128 max_rep_level: i16,
4129 props: WriterPropertiesPtr,
4130 ) -> ColumnWriterImpl<'static, T> {
4131 let descr = Arc::new(get_test_decimals_column_descr::<T>(
4132 max_def_level,
4133 max_rep_level,
4134 ));
4135 let column_writer = get_column_writer(descr, props, page_writer);
4136 get_typed_column_writer::<T>(column_writer)
4137 }
4138
4139 fn get_test_decimals_column_descr<T: DataType>(
4141 max_def_level: i16,
4142 max_rep_level: i16,
4143 ) -> ColumnDescriptor {
4144 let path = ColumnPath::from("col");
4145 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4146 .with_length(16)
4147 .with_logical_type(Some(LogicalType::Decimal {
4148 scale: 2,
4149 precision: 3,
4150 }))
4151 .with_scale(2)
4152 .with_precision(3)
4153 .build()
4154 .unwrap();
4155 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4156 }
4157
4158 fn float16_statistics_roundtrip(
4159 values: &[FixedLenByteArray],
4160 ) -> ValueStatistics<FixedLenByteArray> {
4161 let page_writer = get_test_page_writer();
4162 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4163 writer.write_batch(values, None, None).unwrap();
4164
4165 let metadata = writer.close().unwrap().metadata;
4166 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4167 stats.clone()
4168 } else {
4169 panic!("metadata missing statistics");
4170 }
4171 }
4172
4173 fn get_test_float16_column_writer(
4174 page_writer: Box<dyn PageWriter>,
4175 props: WriterPropertiesPtr,
4176 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4177 let descr = Arc::new(get_test_float16_column_descr(0, 0));
4178 let column_writer = get_column_writer(descr, props, page_writer);
4179 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4180 }
4181
4182 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4183 let path = ColumnPath::from("col");
4184 let tpe =
4185 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4186 .with_length(2)
4187 .with_logical_type(Some(LogicalType::Float16))
4188 .build()
4189 .unwrap();
4190 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4191 }
4192
4193 fn get_test_interval_column_writer(
4194 page_writer: Box<dyn PageWriter>,
4195 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4196 let descr = Arc::new(get_test_interval_column_descr());
4197 let column_writer = get_column_writer(descr, Default::default(), page_writer);
4198 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4199 }
4200
4201 fn get_test_interval_column_descr() -> ColumnDescriptor {
4202 let path = ColumnPath::from("col");
4203 let tpe =
4204 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4205 .with_length(12)
4206 .with_converted_type(ConvertedType::INTERVAL)
4207 .build()
4208 .unwrap();
4209 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4210 }
4211
4212 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4214 page_writer: Box<dyn PageWriter + 'a>,
4215 max_def_level: i16,
4216 max_rep_level: i16,
4217 props: WriterPropertiesPtr,
4218 ) -> ColumnWriterImpl<'a, T> {
4219 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4220 max_def_level,
4221 max_rep_level,
4222 ));
4223 let column_writer = get_column_writer(descr, props, page_writer);
4224 get_typed_column_writer::<T>(column_writer)
4225 }
4226
4227 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4229 max_def_level: i16,
4230 max_rep_level: i16,
4231 ) -> ColumnDescriptor {
4232 let path = ColumnPath::from("col");
4233 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4234 .with_converted_type(ConvertedType::UINT_32)
4235 .build()
4236 .unwrap();
4237 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4238 }
4239}