1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35#[cfg(feature = "encryption")]
36use crate::encryption::encrypt::get_column_crypto_metadata;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{
39 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
40 OffsetIndexBuilder,
41};
42use crate::file::page_encoding_stats::PageEncodingStats;
43use crate::file::properties::{
44 EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
45};
46use crate::file::statistics::{Statistics, ValueStatistics};
47use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
48
49pub(crate) mod encoder;
50
51macro_rules! downcast_writer {
52 ($e:expr, $i:ident, $b:expr) => {
53 match $e {
54 Self::BoolColumnWriter($i) => $b,
55 Self::Int32ColumnWriter($i) => $b,
56 Self::Int64ColumnWriter($i) => $b,
57 Self::Int96ColumnWriter($i) => $b,
58 Self::FloatColumnWriter($i) => $b,
59 Self::DoubleColumnWriter($i) => $b,
60 Self::ByteArrayColumnWriter($i) => $b,
61 Self::FixedLenByteArrayColumnWriter($i) => $b,
62 }
63 };
64}
65
66pub enum ColumnWriter<'a> {
68 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
70 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
72 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
74 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
76 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
78 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
80 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
82 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
84}
85
86impl ColumnWriter<'_> {
87 #[cfg(feature = "arrow")]
89 pub(crate) fn memory_size(&self) -> usize {
90 downcast_writer!(self, typed, typed.memory_size())
91 }
92
93 #[cfg(feature = "arrow")]
95 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
96 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
97 }
98
99 pub fn close(self) -> Result<ColumnCloseResult> {
101 downcast_writer!(self, typed, typed.close())
102 }
103}
104
105pub fn get_column_writer<'a>(
107 descr: ColumnDescPtr,
108 props: WriterPropertiesPtr,
109 page_writer: Box<dyn PageWriter + 'a>,
110) -> ColumnWriter<'a> {
111 match descr.physical_type() {
112 Type::BOOLEAN => {
113 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
114 }
115 Type::INT32 => {
116 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
117 }
118 Type::INT64 => {
119 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
120 }
121 Type::INT96 => {
122 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
123 }
124 Type::FLOAT => {
125 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
126 }
127 Type::DOUBLE => {
128 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
129 }
130 Type::BYTE_ARRAY => {
131 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
132 }
133 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
134 ColumnWriterImpl::new(descr, props, page_writer),
135 ),
136 }
137}
138
139pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
144 T::get_column_writer(col_writer).unwrap_or_else(|| {
145 panic!(
146 "Failed to convert column writer into a typed column writer for `{}` type",
147 T::get_physical_type()
148 )
149 })
150}
151
152pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
154 col_writer: &'b ColumnWriter<'a>,
155) -> &'b ColumnWriterImpl<'a, T> {
156 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
157 panic!(
158 "Failed to convert column writer into a typed column writer for `{}` type",
159 T::get_physical_type()
160 )
161 })
162}
163
164pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
166 col_writer: &'a mut ColumnWriter<'b>,
167) -> &'a mut ColumnWriterImpl<'b, T> {
168 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
169 panic!(
170 "Failed to convert column writer into a typed column writer for `{}` type",
171 T::get_physical_type()
172 )
173 })
174}
175
176#[derive(Debug, Clone)]
178pub struct ColumnCloseResult {
179 pub bytes_written: u64,
181 pub rows_written: u64,
183 pub metadata: ColumnChunkMetaData,
185 pub bloom_filter: Option<Sbbf>,
187 pub column_index: Option<ColumnIndex>,
189 pub offset_index: Option<OffsetIndex>,
191}
192
193#[derive(Default)]
195struct PageMetrics {
196 num_buffered_values: u32,
197 num_buffered_rows: u32,
198 num_page_nulls: u64,
199 repetition_level_histogram: Option<LevelHistogram>,
200 definition_level_histogram: Option<LevelHistogram>,
201}
202
203impl PageMetrics {
204 fn new() -> Self {
205 Default::default()
206 }
207
208 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
210 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
211 self
212 }
213
214 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
216 self.definition_level_histogram = LevelHistogram::try_new(max_level);
217 self
218 }
219
220 fn new_page(&mut self) {
223 self.num_buffered_values = 0;
224 self.num_buffered_rows = 0;
225 self.num_page_nulls = 0;
226 self.repetition_level_histogram
227 .as_mut()
228 .map(LevelHistogram::reset);
229 self.definition_level_histogram
230 .as_mut()
231 .map(LevelHistogram::reset);
232 }
233
234 fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
236 if let Some(ref mut rep_hist) = self.repetition_level_histogram {
237 rep_hist.update_from_levels(levels);
238 }
239 }
240
241 fn update_definition_level_histogram(&mut self, levels: &[i16]) {
243 if let Some(ref mut def_hist) = self.definition_level_histogram {
244 def_hist.update_from_levels(levels);
245 }
246 }
247}
248
249#[derive(Default)]
251struct ColumnMetrics<T: Default> {
252 total_bytes_written: u64,
253 total_rows_written: u64,
254 total_uncompressed_size: u64,
255 total_compressed_size: u64,
256 total_num_values: u64,
257 dictionary_page_offset: Option<u64>,
258 data_page_offset: Option<u64>,
259 min_column_value: Option<T>,
260 max_column_value: Option<T>,
261 num_column_nulls: u64,
262 column_distinct_count: Option<u64>,
263 variable_length_bytes: Option<i64>,
264 repetition_level_histogram: Option<LevelHistogram>,
265 definition_level_histogram: Option<LevelHistogram>,
266}
267
268impl<T: Default> ColumnMetrics<T> {
269 fn new() -> Self {
270 Default::default()
271 }
272
273 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
275 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
276 self
277 }
278
279 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
281 self.definition_level_histogram = LevelHistogram::try_new(max_level);
282 self
283 }
284
285 fn update_histogram(
287 chunk_histogram: &mut Option<LevelHistogram>,
288 page_histogram: &Option<LevelHistogram>,
289 ) {
290 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
291 chunk_hist.add(page_hist);
292 }
293 }
294
295 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
298 ColumnMetrics::<T>::update_histogram(
299 &mut self.definition_level_histogram,
300 &page_metrics.definition_level_histogram,
301 );
302 ColumnMetrics::<T>::update_histogram(
303 &mut self.repetition_level_histogram,
304 &page_metrics.repetition_level_histogram,
305 );
306 }
307
308 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
310 if let Some(var_bytes) = variable_length_bytes {
311 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
312 }
313 }
314}
315
316pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
318
319pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
321 descr: ColumnDescPtr,
323 props: WriterPropertiesPtr,
324 statistics_enabled: EnabledStatistics,
325
326 page_writer: Box<dyn PageWriter + 'a>,
327 codec: Compression,
328 compressor: Option<Box<dyn Codec>>,
329 encoder: E,
330
331 page_metrics: PageMetrics,
332 column_metrics: ColumnMetrics<E::T>,
334
335 encodings: BTreeSet<Encoding>,
338 encoding_stats: Vec<PageEncodingStats>,
339 def_levels_sink: Vec<i16>,
341 rep_levels_sink: Vec<i16>,
342 data_pages: VecDeque<CompressedPage>,
343 column_index_builder: ColumnIndexBuilder,
345 offset_index_builder: Option<OffsetIndexBuilder>,
346
347 data_page_boundary_ascending: bool,
350 data_page_boundary_descending: bool,
351 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
353}
354
355impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
356 pub fn new(
358 descr: ColumnDescPtr,
359 props: WriterPropertiesPtr,
360 page_writer: Box<dyn PageWriter + 'a>,
361 ) -> Self {
362 let codec = props.compression(descr.path());
363 let codec_options = CodecOptionsBuilder::default().build();
364 let compressor = create_codec(codec, &codec_options).unwrap();
365 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
366
367 let statistics_enabled = props.statistics_enabled(descr.path());
368
369 let mut encodings = BTreeSet::new();
370 encodings.insert(Encoding::RLE);
372
373 let mut page_metrics = PageMetrics::new();
374 let mut column_metrics = ColumnMetrics::<E::T>::new();
375
376 if statistics_enabled != EnabledStatistics::None {
378 page_metrics = page_metrics
379 .with_repetition_level_histogram(descr.max_rep_level())
380 .with_definition_level_histogram(descr.max_def_level());
381 column_metrics = column_metrics
382 .with_repetition_level_histogram(descr.max_rep_level())
383 .with_definition_level_histogram(descr.max_def_level())
384 }
385
386 let mut column_index_builder = ColumnIndexBuilder::new();
388 if statistics_enabled != EnabledStatistics::Page {
389 column_index_builder.to_invalid()
390 }
391
392 let offset_index_builder = match props.offset_index_disabled() {
394 false => Some(OffsetIndexBuilder::new()),
395 _ => None,
396 };
397
398 Self {
399 descr,
400 props,
401 statistics_enabled,
402 page_writer,
403 codec,
404 compressor,
405 encoder,
406 def_levels_sink: vec![],
407 rep_levels_sink: vec![],
408 data_pages: VecDeque::new(),
409 page_metrics,
410 column_metrics,
411 column_index_builder,
412 offset_index_builder,
413 encodings,
414 encoding_stats: vec![],
415 data_page_boundary_ascending: true,
416 data_page_boundary_descending: true,
417 last_non_null_data_page_min_max: None,
418 }
419 }
420
421 #[allow(clippy::too_many_arguments)]
422 pub(crate) fn write_batch_internal(
423 &mut self,
424 values: &E::Values,
425 value_indices: Option<&[usize]>,
426 def_levels: Option<&[i16]>,
427 rep_levels: Option<&[i16]>,
428 min: Option<&E::T>,
429 max: Option<&E::T>,
430 distinct_count: Option<u64>,
431 ) -> Result<usize> {
432 if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
434 if def.len() != rep.len() {
435 return Err(general_err!(
436 "Inconsistent length of definition and repetition levels: {} != {}",
437 def.len(),
438 rep.len()
439 ));
440 }
441 }
442
443 let num_levels = match def_levels {
454 Some(def_levels) => def_levels.len(),
455 None => values.len(),
456 };
457
458 if let Some(min) = min {
459 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
460 }
461 if let Some(max) = max {
462 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
463 }
464
465 if self.encoder.num_values() == 0 {
467 self.column_metrics.column_distinct_count = distinct_count;
468 } else {
469 self.column_metrics.column_distinct_count = None;
470 }
471
472 let mut values_offset = 0;
473 let mut levels_offset = 0;
474 let base_batch_size = self.props.write_batch_size();
475 while levels_offset < num_levels {
476 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
477
478 if let Some(r) = rep_levels {
480 while end_offset < r.len() && r[end_offset] != 0 {
481 end_offset += 1;
482 }
483 }
484
485 values_offset += self.write_mini_batch(
486 values,
487 values_offset,
488 value_indices,
489 end_offset - levels_offset,
490 def_levels.map(|lv| &lv[levels_offset..end_offset]),
491 rep_levels.map(|lv| &lv[levels_offset..end_offset]),
492 )?;
493 levels_offset = end_offset;
494 }
495
496 Ok(values_offset)
498 }
499
500 pub fn write_batch(
513 &mut self,
514 values: &E::Values,
515 def_levels: Option<&[i16]>,
516 rep_levels: Option<&[i16]>,
517 ) -> Result<usize> {
518 self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
519 }
520
521 pub fn write_batch_with_statistics(
529 &mut self,
530 values: &E::Values,
531 def_levels: Option<&[i16]>,
532 rep_levels: Option<&[i16]>,
533 min: Option<&E::T>,
534 max: Option<&E::T>,
535 distinct_count: Option<u64>,
536 ) -> Result<usize> {
537 self.write_batch_internal(
538 values,
539 None,
540 def_levels,
541 rep_levels,
542 min,
543 max,
544 distinct_count,
545 )
546 }
547
548 #[cfg(feature = "arrow")]
553 pub(crate) fn memory_size(&self) -> usize {
554 self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
555 }
556
557 pub fn get_total_bytes_written(&self) -> u64 {
563 self.column_metrics.total_bytes_written
564 }
565
566 #[cfg(feature = "arrow")]
572 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
573 self.data_pages
574 .iter()
575 .map(|page| page.data().len() as u64)
576 .sum::<u64>()
577 + self.column_metrics.total_bytes_written
578 + self.encoder.estimated_data_page_size() as u64
579 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
580 }
581
582 pub fn get_total_rows_written(&self) -> u64 {
585 self.column_metrics.total_rows_written
586 }
587
588 pub fn get_descriptor(&self) -> &ColumnDescPtr {
590 &self.descr
591 }
592
593 pub fn close(mut self) -> Result<ColumnCloseResult> {
596 if self.page_metrics.num_buffered_values > 0 {
597 self.add_data_page()?;
598 }
599 if self.encoder.has_dictionary() {
600 self.write_dictionary_page()?;
601 }
602 self.flush_data_pages()?;
603 let metadata = self.build_column_metadata()?;
604 self.page_writer.close()?;
605
606 let boundary_order = match (
607 self.data_page_boundary_ascending,
608 self.data_page_boundary_descending,
609 ) {
610 (true, _) => BoundaryOrder::ASCENDING,
613 (false, true) => BoundaryOrder::DESCENDING,
614 (false, false) => BoundaryOrder::UNORDERED,
615 };
616 self.column_index_builder.set_boundary_order(boundary_order);
617
618 let column_index = self
619 .column_index_builder
620 .valid()
621 .then(|| self.column_index_builder.build_to_thrift());
622
623 let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());
624
625 Ok(ColumnCloseResult {
626 bytes_written: self.column_metrics.total_bytes_written,
627 rows_written: self.column_metrics.total_rows_written,
628 bloom_filter: self.encoder.flush_bloom_filter(),
629 metadata,
630 column_index,
631 offset_index,
632 })
633 }
634
635 fn write_mini_batch(
639 &mut self,
640 values: &E::Values,
641 values_offset: usize,
642 value_indices: Option<&[usize]>,
643 num_levels: usize,
644 def_levels: Option<&[i16]>,
645 rep_levels: Option<&[i16]>,
646 ) -> Result<usize> {
647 let values_to_write = if self.descr.max_def_level() > 0 {
649 let levels = def_levels.ok_or_else(|| {
650 general_err!(
651 "Definition levels are required, because max definition level = {}",
652 self.descr.max_def_level()
653 )
654 })?;
655
656 let values_to_write = levels
657 .iter()
658 .map(|level| (*level == self.descr.max_def_level()) as usize)
659 .sum();
660 self.page_metrics.num_page_nulls += (levels.len() - values_to_write) as u64;
661
662 self.page_metrics.update_definition_level_histogram(levels);
664
665 self.def_levels_sink.extend_from_slice(levels);
666 values_to_write
667 } else {
668 num_levels
669 };
670
671 if self.descr.max_rep_level() > 0 {
673 let levels = rep_levels.ok_or_else(|| {
675 general_err!(
676 "Repetition levels are required, because max repetition level = {}",
677 self.descr.max_rep_level()
678 )
679 })?;
680
681 if !levels.is_empty() && levels[0] != 0 {
682 return Err(general_err!(
683 "Write must start at a record boundary, got non-zero repetition level of {}",
684 levels[0]
685 ));
686 }
687
688 for &level in levels {
690 self.page_metrics.num_buffered_rows += (level == 0) as u32
691 }
692
693 self.page_metrics.update_repetition_level_histogram(levels);
695
696 self.rep_levels_sink.extend_from_slice(levels);
697 } else {
698 self.page_metrics.num_buffered_rows += num_levels as u32;
701 }
702
703 match value_indices {
704 Some(indices) => {
705 let indices = &indices[values_offset..values_offset + values_to_write];
706 self.encoder.write_gather(values, indices)?;
707 }
708 None => self.encoder.write(values, values_offset, values_to_write)?,
709 }
710
711 self.page_metrics.num_buffered_values += num_levels as u32;
712
713 if self.should_add_data_page() {
714 self.add_data_page()?;
715 }
716
717 if self.should_dict_fallback() {
718 self.dict_fallback()?;
719 }
720
721 Ok(values_to_write)
722 }
723
724 #[inline]
729 fn should_dict_fallback(&self) -> bool {
730 match self.encoder.estimated_dict_page_size() {
731 Some(size) => {
732 size >= self
733 .props
734 .column_dictionary_page_size_limit(self.descr.path())
735 }
736 None => false,
737 }
738 }
739
740 #[inline]
742 fn should_add_data_page(&self) -> bool {
743 if self.page_metrics.num_buffered_values == 0 {
748 return false;
749 }
750
751 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
752 || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
753 }
754
755 fn dict_fallback(&mut self) -> Result<()> {
758 if self.page_metrics.num_buffered_values > 0 {
760 self.add_data_page()?;
761 }
762 self.write_dictionary_page()?;
763 self.flush_data_pages()?;
764 Ok(())
765 }
766
767 fn update_column_offset_index(
769 &mut self,
770 page_statistics: Option<&ValueStatistics<E::T>>,
771 page_variable_length_bytes: Option<i64>,
772 ) {
773 let null_page =
775 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
776 if null_page && self.column_index_builder.valid() {
779 self.column_index_builder.append(
780 null_page,
781 vec![],
782 vec![],
783 self.page_metrics.num_page_nulls as i64,
784 );
785 } else if self.column_index_builder.valid() {
786 match &page_statistics {
789 None => {
790 self.column_index_builder.to_invalid();
791 }
792 Some(stat) => {
793 let new_min = stat.min_opt().unwrap();
795 let new_max = stat.max_opt().unwrap();
796 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
797 if self.data_page_boundary_ascending {
798 let not_ascending = compare_greater(&self.descr, last_min, new_min)
800 || compare_greater(&self.descr, last_max, new_max);
801 if not_ascending {
802 self.data_page_boundary_ascending = false;
803 }
804 }
805
806 if self.data_page_boundary_descending {
807 let not_descending = compare_greater(&self.descr, new_min, last_min)
809 || compare_greater(&self.descr, new_max, last_max);
810 if not_descending {
811 self.data_page_boundary_descending = false;
812 }
813 }
814 }
815 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
816
817 if self.can_truncate_value() {
818 self.column_index_builder.append(
819 null_page,
820 self.truncate_min_value(
821 self.props.column_index_truncate_length(),
822 stat.min_bytes_opt().unwrap(),
823 )
824 .0,
825 self.truncate_max_value(
826 self.props.column_index_truncate_length(),
827 stat.max_bytes_opt().unwrap(),
828 )
829 .0,
830 self.page_metrics.num_page_nulls as i64,
831 );
832 } else {
833 self.column_index_builder.append(
834 null_page,
835 stat.min_bytes_opt().unwrap().to_vec(),
836 stat.max_bytes_opt().unwrap().to_vec(),
837 self.page_metrics.num_page_nulls as i64,
838 );
839 }
840 }
841 }
842 }
843
844 self.column_index_builder.append_histograms(
846 &self.page_metrics.repetition_level_histogram,
847 &self.page_metrics.definition_level_histogram,
848 );
849
850 if let Some(builder) = self.offset_index_builder.as_mut() {
852 builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
853 builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
854 }
855 }
856
857 fn can_truncate_value(&self) -> bool {
859 match self.descr.physical_type() {
860 Type::FIXED_LEN_BYTE_ARRAY
864 if !matches!(
865 self.descr.logical_type(),
866 Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
867 ) =>
868 {
869 true
870 }
871 Type::BYTE_ARRAY => true,
872 _ => false,
874 }
875 }
876
877 fn is_utf8(&self) -> bool {
879 self.get_descriptor().logical_type() == Some(LogicalType::String)
880 || self.get_descriptor().converted_type() == ConvertedType::UTF8
881 }
882
883 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
894 truncation_length
895 .filter(|l| data.len() > *l)
896 .and_then(|l|
897 if self.is_utf8() {
899 match str::from_utf8(data) {
900 Ok(str_data) => truncate_utf8(str_data, l),
901 Err(_) => Some(data[..l].to_vec()),
902 }
903 } else {
904 Some(data[..l].to_vec())
905 }
906 )
907 .map(|truncated| (truncated, true))
908 .unwrap_or_else(|| (data.to_vec(), false))
909 }
910
911 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
925 truncation_length
926 .filter(|l| data.len() > *l)
927 .and_then(|l|
928 if self.is_utf8() {
930 match str::from_utf8(data) {
931 Ok(str_data) => truncate_and_increment_utf8(str_data, l),
932 Err(_) => increment(data[..l].to_vec()),
933 }
934 } else {
935 increment(data[..l].to_vec())
936 }
937 )
938 .map(|truncated| (truncated, true))
939 .unwrap_or_else(|| (data.to_vec(), false))
940 }
941
942 fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
945 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
946 match statistics {
947 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
948 let (min, did_truncate_min) = self.truncate_min_value(
949 self.props.statistics_truncate_length(),
950 stats.min_bytes_opt().unwrap(),
951 );
952 let (max, did_truncate_max) = self.truncate_max_value(
953 self.props.statistics_truncate_length(),
954 stats.max_bytes_opt().unwrap(),
955 );
956 Statistics::ByteArray(
957 ValueStatistics::new(
958 Some(min.into()),
959 Some(max.into()),
960 stats.distinct_count(),
961 stats.null_count_opt(),
962 backwards_compatible_min_max,
963 )
964 .with_max_is_exact(!did_truncate_max)
965 .with_min_is_exact(!did_truncate_min),
966 )
967 }
968 Statistics::FixedLenByteArray(stats)
969 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
970 {
971 let (min, did_truncate_min) = self.truncate_min_value(
972 self.props.statistics_truncate_length(),
973 stats.min_bytes_opt().unwrap(),
974 );
975 let (max, did_truncate_max) = self.truncate_max_value(
976 self.props.statistics_truncate_length(),
977 stats.max_bytes_opt().unwrap(),
978 );
979 Statistics::FixedLenByteArray(
980 ValueStatistics::new(
981 Some(min.into()),
982 Some(max.into()),
983 stats.distinct_count(),
984 stats.null_count_opt(),
985 backwards_compatible_min_max,
986 )
987 .with_max_is_exact(!did_truncate_max)
988 .with_min_is_exact(!did_truncate_min),
989 )
990 }
991 stats => stats,
992 }
993 }
994
995 fn add_data_page(&mut self) -> Result<()> {
998 let values_data = self.encoder.flush_data_page()?;
1000
1001 let max_def_level = self.descr.max_def_level();
1002 let max_rep_level = self.descr.max_rep_level();
1003
1004 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1005
1006 let page_statistics = match (values_data.min_value, values_data.max_value) {
1007 (Some(min), Some(max)) => {
1008 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1010 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1011
1012 (self.statistics_enabled == EnabledStatistics::Page).then_some(
1013 ValueStatistics::new(
1014 Some(min),
1015 Some(max),
1016 None,
1017 Some(self.page_metrics.num_page_nulls),
1018 false,
1019 ),
1020 )
1021 }
1022 _ => None,
1023 };
1024
1025 self.update_column_offset_index(
1027 page_statistics.as_ref(),
1028 values_data.variable_length_bytes,
1029 );
1030
1031 self.column_metrics
1033 .update_from_page_metrics(&self.page_metrics);
1034 self.column_metrics
1035 .update_variable_length_bytes(values_data.variable_length_bytes);
1036
1037 let page_statistics = page_statistics
1039 .filter(|_| self.props.write_page_header_statistics(self.descr.path()))
1040 .map(|stats| self.truncate_statistics(Statistics::from(stats)));
1041
1042 let compressed_page = match self.props.writer_version() {
1043 WriterVersion::PARQUET_1_0 => {
1044 let mut buffer = vec![];
1045
1046 if max_rep_level > 0 {
1047 buffer.extend_from_slice(
1048 &self.encode_levels_v1(
1049 Encoding::RLE,
1050 &self.rep_levels_sink[..],
1051 max_rep_level,
1052 )[..],
1053 );
1054 }
1055
1056 if max_def_level > 0 {
1057 buffer.extend_from_slice(
1058 &self.encode_levels_v1(
1059 Encoding::RLE,
1060 &self.def_levels_sink[..],
1061 max_def_level,
1062 )[..],
1063 );
1064 }
1065
1066 buffer.extend_from_slice(&values_data.buf);
1067 let uncompressed_size = buffer.len();
1068
1069 if let Some(ref mut cmpr) = self.compressor {
1070 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1071 cmpr.compress(&buffer[..], &mut compressed_buf)?;
1072 buffer = compressed_buf;
1073 }
1074
1075 let data_page = Page::DataPage {
1076 buf: buffer.into(),
1077 num_values: self.page_metrics.num_buffered_values,
1078 encoding: values_data.encoding,
1079 def_level_encoding: Encoding::RLE,
1080 rep_level_encoding: Encoding::RLE,
1081 statistics: page_statistics,
1082 };
1083
1084 CompressedPage::new(data_page, uncompressed_size)
1085 }
1086 WriterVersion::PARQUET_2_0 => {
1087 let mut rep_levels_byte_len = 0;
1088 let mut def_levels_byte_len = 0;
1089 let mut buffer = vec![];
1090
1091 if max_rep_level > 0 {
1092 let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1093 rep_levels_byte_len = levels.len();
1094 buffer.extend_from_slice(&levels[..]);
1095 }
1096
1097 if max_def_level > 0 {
1098 let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1099 def_levels_byte_len = levels.len();
1100 buffer.extend_from_slice(&levels[..]);
1101 }
1102
1103 let uncompressed_size =
1104 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1105
1106 match self.compressor {
1108 Some(ref mut cmpr) => {
1109 cmpr.compress(&values_data.buf, &mut buffer)?;
1110 }
1111 None => buffer.extend_from_slice(&values_data.buf),
1112 }
1113
1114 let data_page = Page::DataPageV2 {
1115 buf: buffer.into(),
1116 num_values: self.page_metrics.num_buffered_values,
1117 encoding: values_data.encoding,
1118 num_nulls: self.page_metrics.num_page_nulls as u32,
1119 num_rows: self.page_metrics.num_buffered_rows,
1120 def_levels_byte_len: def_levels_byte_len as u32,
1121 rep_levels_byte_len: rep_levels_byte_len as u32,
1122 is_compressed: self.compressor.is_some(),
1123 statistics: page_statistics,
1124 };
1125
1126 CompressedPage::new(data_page, uncompressed_size)
1127 }
1128 };
1129
1130 if self.encoder.has_dictionary() {
1132 self.data_pages.push_back(compressed_page);
1133 } else {
1134 self.write_data_page(compressed_page)?;
1135 }
1136
1137 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1139
1140 self.rep_levels_sink.clear();
1142 self.def_levels_sink.clear();
1143 self.page_metrics.new_page();
1144
1145 Ok(())
1146 }
1147
1148 #[inline]
1151 fn flush_data_pages(&mut self) -> Result<()> {
1152 if self.page_metrics.num_buffered_values > 0 {
1154 self.add_data_page()?;
1155 }
1156
1157 while let Some(page) = self.data_pages.pop_front() {
1158 self.write_data_page(page)?;
1159 }
1160
1161 Ok(())
1162 }
1163
1164 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1166 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1167 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1168 let num_values = self.column_metrics.total_num_values as i64;
1169 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1170 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1172
1173 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1174 .set_compression(self.codec)
1175 .set_encodings(self.encodings.iter().cloned().collect())
1176 .set_page_encoding_stats(self.encoding_stats.clone())
1177 .set_total_compressed_size(total_compressed_size)
1178 .set_total_uncompressed_size(total_uncompressed_size)
1179 .set_num_values(num_values)
1180 .set_data_page_offset(data_page_offset)
1181 .set_dictionary_page_offset(dict_page_offset);
1182
1183 if self.statistics_enabled != EnabledStatistics::None {
1184 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1185
1186 let statistics = ValueStatistics::<E::T>::new(
1187 self.column_metrics.min_column_value.clone(),
1188 self.column_metrics.max_column_value.clone(),
1189 self.column_metrics.column_distinct_count,
1190 Some(self.column_metrics.num_column_nulls),
1191 false,
1192 )
1193 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1194 .into();
1195
1196 let statistics = self.truncate_statistics(statistics);
1197
1198 builder = builder
1199 .set_statistics(statistics)
1200 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1201 .set_repetition_level_histogram(
1202 self.column_metrics.repetition_level_histogram.take(),
1203 )
1204 .set_definition_level_histogram(
1205 self.column_metrics.definition_level_histogram.take(),
1206 );
1207 }
1208
1209 builder = self.set_column_chunk_encryption_properties(builder);
1210
1211 let metadata = builder.build()?;
1212 Ok(metadata)
1213 }
1214
1215 #[inline]
1217 fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1218 let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1219 encoder.put(levels);
1220 encoder.consume()
1221 }
1222
1223 #[inline]
1226 fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1227 let mut encoder = LevelEncoder::v2(max_level, levels.len());
1228 encoder.put(levels);
1229 encoder.consume()
1230 }
1231
1232 #[inline]
1234 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1235 self.encodings.insert(page.encoding());
1236 match self.encoding_stats.last_mut() {
1237 Some(encoding_stats)
1238 if encoding_stats.page_type == page.page_type()
1239 && encoding_stats.encoding == page.encoding() =>
1240 {
1241 encoding_stats.count += 1;
1242 }
1243 _ => {
1244 self.encoding_stats.push(PageEncodingStats {
1247 page_type: page.page_type(),
1248 encoding: page.encoding(),
1249 count: 1,
1250 });
1251 }
1252 }
1253 let page_spec = self.page_writer.write_page(page)?;
1254 if let Some(builder) = self.offset_index_builder.as_mut() {
1257 builder
1258 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1259 }
1260 self.update_metrics_for_page(page_spec);
1261 Ok(())
1262 }
1263
1264 #[inline]
1266 fn write_dictionary_page(&mut self) -> Result<()> {
1267 let compressed_page = {
1268 let mut page = self
1269 .encoder
1270 .flush_dict_page()?
1271 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1272
1273 let uncompressed_size = page.buf.len();
1274
1275 if let Some(ref mut cmpr) = self.compressor {
1276 let mut output_buf = Vec::with_capacity(uncompressed_size);
1277 cmpr.compress(&page.buf, &mut output_buf)?;
1278 page.buf = Bytes::from(output_buf);
1279 }
1280
1281 let dict_page = Page::DictionaryPage {
1282 buf: page.buf,
1283 num_values: page.num_values as u32,
1284 encoding: self.props.dictionary_page_encoding(),
1285 is_sorted: page.is_sorted,
1286 };
1287 CompressedPage::new(dict_page, uncompressed_size)
1288 };
1289
1290 self.encodings.insert(compressed_page.encoding());
1291 self.encoding_stats.push(PageEncodingStats {
1292 page_type: PageType::DICTIONARY_PAGE,
1293 encoding: compressed_page.encoding(),
1294 count: 1,
1295 });
1296 let page_spec = self.page_writer.write_page(compressed_page)?;
1297 self.update_metrics_for_page(page_spec);
1298 Ok(())
1300 }
1301
1302 #[inline]
1304 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1305 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1306 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1307 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1308
1309 match page_spec.page_type {
1310 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1311 self.column_metrics.total_num_values += page_spec.num_values as u64;
1312 if self.column_metrics.data_page_offset.is_none() {
1313 self.column_metrics.data_page_offset = Some(page_spec.offset);
1314 }
1315 }
1316 PageType::DICTIONARY_PAGE => {
1317 assert!(
1318 self.column_metrics.dictionary_page_offset.is_none(),
1319 "Dictionary offset is already set"
1320 );
1321 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1322 }
1323 _ => {}
1324 }
1325 }
1326
1327 #[inline]
1328 #[cfg(feature = "encryption")]
1329 fn set_column_chunk_encryption_properties(
1330 &self,
1331 builder: ColumnChunkMetaDataBuilder,
1332 ) -> ColumnChunkMetaDataBuilder {
1333 if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1334 builder.set_column_crypto_metadata(get_column_crypto_metadata(
1335 encryption_properties,
1336 &self.descr,
1337 ))
1338 } else {
1339 builder
1340 }
1341 }
1342
1343 #[inline]
1344 #[cfg(not(feature = "encryption"))]
1345 fn set_column_chunk_encryption_properties(
1346 &self,
1347 builder: ColumnChunkMetaDataBuilder,
1348 ) -> ColumnChunkMetaDataBuilder {
1349 builder
1350 }
1351}
1352
1353fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1354 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1355}
1356
1357fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1358 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1359}
1360
1361#[inline]
1362#[allow(clippy::eq_op)]
1363fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1364 match T::PHYSICAL_TYPE {
1365 Type::FLOAT | Type::DOUBLE => val != val,
1366 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1367 let val = val.as_bytes();
1368 let val = f16::from_le_bytes([val[0], val[1]]);
1369 val.is_nan()
1370 }
1371 _ => false,
1372 }
1373}
1374
1375fn update_stat<T: ParquetValueType, F>(
1380 descr: &ColumnDescriptor,
1381 val: &T,
1382 cur: &mut Option<T>,
1383 should_update: F,
1384) where
1385 F: Fn(&T) -> bool,
1386{
1387 if is_nan(descr, val) {
1388 return;
1389 }
1390
1391 if cur.as_ref().map_or(true, should_update) {
1392 *cur = Some(val.clone());
1393 }
1394}
1395
1396fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1398 if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
1399 if !is_signed {
1400 return a.as_u64().unwrap() > b.as_u64().unwrap();
1402 }
1403 }
1404
1405 match descr.converted_type() {
1406 ConvertedType::UINT_8
1407 | ConvertedType::UINT_16
1408 | ConvertedType::UINT_32
1409 | ConvertedType::UINT_64 => {
1410 return a.as_u64().unwrap() > b.as_u64().unwrap();
1411 }
1412 _ => {}
1413 };
1414
1415 if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1416 match T::PHYSICAL_TYPE {
1417 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1418 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1419 }
1420 _ => {}
1421 };
1422 }
1423
1424 if descr.converted_type() == ConvertedType::DECIMAL {
1425 match T::PHYSICAL_TYPE {
1426 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1427 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1428 }
1429 _ => {}
1430 };
1431 };
1432
1433 if let Some(LogicalType::Float16) = descr.logical_type() {
1434 let a = a.as_bytes();
1435 let a = f16::from_le_bytes([a[0], a[1]]);
1436 let b = b.as_bytes();
1437 let b = f16::from_le_bytes([b[0], b[1]]);
1438 return a > b;
1439 }
1440
1441 a > b
1442}
1443
1444fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1452 match (kind, props.writer_version()) {
1453 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1454 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1455 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1456 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1457 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1458 _ => Encoding::PLAIN,
1459 }
1460}
1461
1462fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1464 match (kind, props.writer_version()) {
1465 (Type::BOOLEAN, _) => false,
1467 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1469 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1470 _ => true,
1471 }
1472}
1473
1474fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1476 let a_length = a.len();
1477 let b_length = b.len();
1478
1479 if a_length == 0 || b_length == 0 {
1480 return a_length > 0;
1481 }
1482
1483 let first_a: u8 = a[0];
1484 let first_b: u8 = b[0];
1485
1486 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1491 return (first_a as i8) > (first_b as i8);
1492 }
1493
1494 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1500
1501 if a_length != b_length {
1502 let not_equal = if a_length > b_length {
1503 let lead_length = a_length - b_length;
1504 a[0..lead_length].iter().any(|&x| x != extension)
1505 } else {
1506 let lead_length = b_length - a_length;
1507 b[0..lead_length].iter().any(|&x| x != extension)
1508 };
1509
1510 if not_equal {
1511 let negative_values: bool = (first_a as i8) < 0;
1512 let a_longer: bool = a_length > b_length;
1513 return if negative_values { !a_longer } else { a_longer };
1514 }
1515 }
1516
1517 (a[1..]) > (b[1..])
1518}
1519
1520fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1526 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1527 Some(data.as_bytes()[..split].to_vec())
1528}
1529
1530fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1536 let lower_bound = length.saturating_sub(3);
1538 let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1539 increment_utf8(data.get(..split)?)
1540}
1541
1542fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1549 for (idx, original_char) in data.char_indices().rev() {
1550 let original_len = original_char.len_utf8();
1551 if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1552 if next_char.len_utf8() == original_len {
1554 let mut result = data.as_bytes()[..idx + original_len].to_vec();
1555 next_char.encode_utf8(&mut result[idx..]);
1556 return Some(result);
1557 }
1558 }
1559 }
1560
1561 None
1562}
1563
1564fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1568 for byte in data.iter_mut().rev() {
1569 let (incremented, overflow) = byte.overflowing_add(1);
1570 *byte = incremented;
1571
1572 if !overflow {
1573 return Some(data);
1574 }
1575 }
1576
1577 None
1578}
1579
1580#[cfg(test)]
1581mod tests {
1582 use crate::{
1583 file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1584 schema::parser::parse_message_type,
1585 };
1586 use core::str;
1587 use rand::distr::uniform::SampleUniform;
1588 use std::{fs::File, sync::Arc};
1589
1590 use crate::column::{
1591 page::PageReader,
1592 reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1593 };
1594 use crate::file::writer::TrackedWrite;
1595 use crate::file::{
1596 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1597 };
1598 use crate::schema::types::{ColumnPath, Type as SchemaType};
1599 use crate::util::test_common::rand_gen::random_numbers_range;
1600
1601 use super::*;
1602
1603 #[test]
1604 fn test_column_writer_inconsistent_def_rep_length() {
1605 let page_writer = get_test_page_writer();
1606 let props = Default::default();
1607 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1608 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1609 assert!(res.is_err());
1610 if let Err(err) = res {
1611 assert_eq!(
1612 format!("{err}"),
1613 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1614 );
1615 }
1616 }
1617
1618 #[test]
1619 fn test_column_writer_invalid_def_levels() {
1620 let page_writer = get_test_page_writer();
1621 let props = Default::default();
1622 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1623 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1624 assert!(res.is_err());
1625 if let Err(err) = res {
1626 assert_eq!(
1627 format!("{err}"),
1628 "Parquet error: Definition levels are required, because max definition level = 1"
1629 );
1630 }
1631 }
1632
1633 #[test]
1634 fn test_column_writer_invalid_rep_levels() {
1635 let page_writer = get_test_page_writer();
1636 let props = Default::default();
1637 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1638 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1639 assert!(res.is_err());
1640 if let Err(err) = res {
1641 assert_eq!(
1642 format!("{err}"),
1643 "Parquet error: Repetition levels are required, because max repetition level = 1"
1644 );
1645 }
1646 }
1647
1648 #[test]
1649 fn test_column_writer_not_enough_values_to_write() {
1650 let page_writer = get_test_page_writer();
1651 let props = Default::default();
1652 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1653 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1654 assert!(res.is_err());
1655 if let Err(err) = res {
1656 assert_eq!(
1657 format!("{err}"),
1658 "Parquet error: Expected to write 4 values, but have only 2"
1659 );
1660 }
1661 }
1662
1663 #[test]
1664 fn test_column_writer_write_only_one_dictionary_page() {
1665 let page_writer = get_test_page_writer();
1666 let props = Default::default();
1667 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1668 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1669 writer.add_data_page().unwrap();
1671 writer.write_dictionary_page().unwrap();
1672 let err = writer.write_dictionary_page().unwrap_err().to_string();
1673 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1674 }
1675
1676 #[test]
1677 fn test_column_writer_error_when_writing_disabled_dictionary() {
1678 let page_writer = get_test_page_writer();
1679 let props = Arc::new(
1680 WriterProperties::builder()
1681 .set_dictionary_enabled(false)
1682 .build(),
1683 );
1684 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1685 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1686 let err = writer.write_dictionary_page().unwrap_err().to_string();
1687 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1688 }
1689
1690 #[test]
1691 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1692 let page_writer = get_test_page_writer();
1693 let props = Arc::new(
1694 WriterProperties::builder()
1695 .set_dictionary_enabled(true)
1696 .build(),
1697 );
1698 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1699 writer
1700 .write_batch(&[true, false, true, false], None, None)
1701 .unwrap();
1702
1703 let r = writer.close().unwrap();
1704 assert_eq!(r.bytes_written, 1);
1707 assert_eq!(r.rows_written, 4);
1708
1709 let metadata = r.metadata;
1710 assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1711 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
1713 }
1714
1715 #[test]
1716 fn test_column_writer_default_encoding_support_bool() {
1717 check_encoding_write_support::<BoolType>(
1718 WriterVersion::PARQUET_1_0,
1719 true,
1720 &[true, false],
1721 None,
1722 &[Encoding::PLAIN, Encoding::RLE],
1723 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1724 );
1725 check_encoding_write_support::<BoolType>(
1726 WriterVersion::PARQUET_1_0,
1727 false,
1728 &[true, false],
1729 None,
1730 &[Encoding::PLAIN, Encoding::RLE],
1731 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1732 );
1733 check_encoding_write_support::<BoolType>(
1734 WriterVersion::PARQUET_2_0,
1735 true,
1736 &[true, false],
1737 None,
1738 &[Encoding::RLE],
1739 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1740 );
1741 check_encoding_write_support::<BoolType>(
1742 WriterVersion::PARQUET_2_0,
1743 false,
1744 &[true, false],
1745 None,
1746 &[Encoding::RLE],
1747 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1748 );
1749 }
1750
1751 #[test]
1752 fn test_column_writer_default_encoding_support_int32() {
1753 check_encoding_write_support::<Int32Type>(
1754 WriterVersion::PARQUET_1_0,
1755 true,
1756 &[1, 2],
1757 Some(0),
1758 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1759 &[
1760 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1761 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1762 ],
1763 );
1764 check_encoding_write_support::<Int32Type>(
1765 WriterVersion::PARQUET_1_0,
1766 false,
1767 &[1, 2],
1768 None,
1769 &[Encoding::PLAIN, Encoding::RLE],
1770 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1771 );
1772 check_encoding_write_support::<Int32Type>(
1773 WriterVersion::PARQUET_2_0,
1774 true,
1775 &[1, 2],
1776 Some(0),
1777 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1778 &[
1779 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1780 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1781 ],
1782 );
1783 check_encoding_write_support::<Int32Type>(
1784 WriterVersion::PARQUET_2_0,
1785 false,
1786 &[1, 2],
1787 None,
1788 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1789 &[encoding_stats(
1790 PageType::DATA_PAGE_V2,
1791 Encoding::DELTA_BINARY_PACKED,
1792 1,
1793 )],
1794 );
1795 }
1796
1797 #[test]
1798 fn test_column_writer_default_encoding_support_int64() {
1799 check_encoding_write_support::<Int64Type>(
1800 WriterVersion::PARQUET_1_0,
1801 true,
1802 &[1, 2],
1803 Some(0),
1804 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1805 &[
1806 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1807 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1808 ],
1809 );
1810 check_encoding_write_support::<Int64Type>(
1811 WriterVersion::PARQUET_1_0,
1812 false,
1813 &[1, 2],
1814 None,
1815 &[Encoding::PLAIN, Encoding::RLE],
1816 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1817 );
1818 check_encoding_write_support::<Int64Type>(
1819 WriterVersion::PARQUET_2_0,
1820 true,
1821 &[1, 2],
1822 Some(0),
1823 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1824 &[
1825 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1826 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1827 ],
1828 );
1829 check_encoding_write_support::<Int64Type>(
1830 WriterVersion::PARQUET_2_0,
1831 false,
1832 &[1, 2],
1833 None,
1834 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1835 &[encoding_stats(
1836 PageType::DATA_PAGE_V2,
1837 Encoding::DELTA_BINARY_PACKED,
1838 1,
1839 )],
1840 );
1841 }
1842
1843 #[test]
1844 fn test_column_writer_default_encoding_support_int96() {
1845 check_encoding_write_support::<Int96Type>(
1846 WriterVersion::PARQUET_1_0,
1847 true,
1848 &[Int96::from(vec![1, 2, 3])],
1849 Some(0),
1850 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1851 &[
1852 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1853 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1854 ],
1855 );
1856 check_encoding_write_support::<Int96Type>(
1857 WriterVersion::PARQUET_1_0,
1858 false,
1859 &[Int96::from(vec![1, 2, 3])],
1860 None,
1861 &[Encoding::PLAIN, Encoding::RLE],
1862 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1863 );
1864 check_encoding_write_support::<Int96Type>(
1865 WriterVersion::PARQUET_2_0,
1866 true,
1867 &[Int96::from(vec![1, 2, 3])],
1868 Some(0),
1869 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1870 &[
1871 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1872 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1873 ],
1874 );
1875 check_encoding_write_support::<Int96Type>(
1876 WriterVersion::PARQUET_2_0,
1877 false,
1878 &[Int96::from(vec![1, 2, 3])],
1879 None,
1880 &[Encoding::PLAIN, Encoding::RLE],
1881 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1882 );
1883 }
1884
1885 #[test]
1886 fn test_column_writer_default_encoding_support_float() {
1887 check_encoding_write_support::<FloatType>(
1888 WriterVersion::PARQUET_1_0,
1889 true,
1890 &[1.0, 2.0],
1891 Some(0),
1892 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1893 &[
1894 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1895 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1896 ],
1897 );
1898 check_encoding_write_support::<FloatType>(
1899 WriterVersion::PARQUET_1_0,
1900 false,
1901 &[1.0, 2.0],
1902 None,
1903 &[Encoding::PLAIN, Encoding::RLE],
1904 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1905 );
1906 check_encoding_write_support::<FloatType>(
1907 WriterVersion::PARQUET_2_0,
1908 true,
1909 &[1.0, 2.0],
1910 Some(0),
1911 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1912 &[
1913 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1914 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1915 ],
1916 );
1917 check_encoding_write_support::<FloatType>(
1918 WriterVersion::PARQUET_2_0,
1919 false,
1920 &[1.0, 2.0],
1921 None,
1922 &[Encoding::PLAIN, Encoding::RLE],
1923 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1924 );
1925 }
1926
1927 #[test]
1928 fn test_column_writer_default_encoding_support_double() {
1929 check_encoding_write_support::<DoubleType>(
1930 WriterVersion::PARQUET_1_0,
1931 true,
1932 &[1.0, 2.0],
1933 Some(0),
1934 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1935 &[
1936 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1937 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1938 ],
1939 );
1940 check_encoding_write_support::<DoubleType>(
1941 WriterVersion::PARQUET_1_0,
1942 false,
1943 &[1.0, 2.0],
1944 None,
1945 &[Encoding::PLAIN, Encoding::RLE],
1946 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1947 );
1948 check_encoding_write_support::<DoubleType>(
1949 WriterVersion::PARQUET_2_0,
1950 true,
1951 &[1.0, 2.0],
1952 Some(0),
1953 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1954 &[
1955 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1956 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1957 ],
1958 );
1959 check_encoding_write_support::<DoubleType>(
1960 WriterVersion::PARQUET_2_0,
1961 false,
1962 &[1.0, 2.0],
1963 None,
1964 &[Encoding::PLAIN, Encoding::RLE],
1965 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1966 );
1967 }
1968
1969 #[test]
1970 fn test_column_writer_default_encoding_support_byte_array() {
1971 check_encoding_write_support::<ByteArrayType>(
1972 WriterVersion::PARQUET_1_0,
1973 true,
1974 &[ByteArray::from(vec![1u8])],
1975 Some(0),
1976 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1977 &[
1978 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1979 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1980 ],
1981 );
1982 check_encoding_write_support::<ByteArrayType>(
1983 WriterVersion::PARQUET_1_0,
1984 false,
1985 &[ByteArray::from(vec![1u8])],
1986 None,
1987 &[Encoding::PLAIN, Encoding::RLE],
1988 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1989 );
1990 check_encoding_write_support::<ByteArrayType>(
1991 WriterVersion::PARQUET_2_0,
1992 true,
1993 &[ByteArray::from(vec![1u8])],
1994 Some(0),
1995 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1996 &[
1997 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1998 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1999 ],
2000 );
2001 check_encoding_write_support::<ByteArrayType>(
2002 WriterVersion::PARQUET_2_0,
2003 false,
2004 &[ByteArray::from(vec![1u8])],
2005 None,
2006 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2007 &[encoding_stats(
2008 PageType::DATA_PAGE_V2,
2009 Encoding::DELTA_BYTE_ARRAY,
2010 1,
2011 )],
2012 );
2013 }
2014
2015 #[test]
2016 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2017 check_encoding_write_support::<FixedLenByteArrayType>(
2018 WriterVersion::PARQUET_1_0,
2019 true,
2020 &[ByteArray::from(vec![1u8]).into()],
2021 None,
2022 &[Encoding::PLAIN, Encoding::RLE],
2023 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2024 );
2025 check_encoding_write_support::<FixedLenByteArrayType>(
2026 WriterVersion::PARQUET_1_0,
2027 false,
2028 &[ByteArray::from(vec![1u8]).into()],
2029 None,
2030 &[Encoding::PLAIN, Encoding::RLE],
2031 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2032 );
2033 check_encoding_write_support::<FixedLenByteArrayType>(
2034 WriterVersion::PARQUET_2_0,
2035 true,
2036 &[ByteArray::from(vec![1u8]).into()],
2037 Some(0),
2038 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2039 &[
2040 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2041 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2042 ],
2043 );
2044 check_encoding_write_support::<FixedLenByteArrayType>(
2045 WriterVersion::PARQUET_2_0,
2046 false,
2047 &[ByteArray::from(vec![1u8]).into()],
2048 None,
2049 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2050 &[encoding_stats(
2051 PageType::DATA_PAGE_V2,
2052 Encoding::DELTA_BYTE_ARRAY,
2053 1,
2054 )],
2055 );
2056 }
2057
2058 #[test]
2059 fn test_column_writer_check_metadata() {
2060 let page_writer = get_test_page_writer();
2061 let props = Default::default();
2062 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2063 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2064
2065 let r = writer.close().unwrap();
2066 assert_eq!(r.bytes_written, 20);
2067 assert_eq!(r.rows_written, 4);
2068
2069 let metadata = r.metadata;
2070 assert_eq!(
2071 metadata.encodings(),
2072 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2073 );
2074 assert_eq!(metadata.num_values(), 4);
2075 assert_eq!(metadata.compressed_size(), 20);
2076 assert_eq!(metadata.uncompressed_size(), 20);
2077 assert_eq!(metadata.data_page_offset(), 0);
2078 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2079 if let Some(stats) = metadata.statistics() {
2080 assert_eq!(stats.null_count_opt(), Some(0));
2081 assert_eq!(stats.distinct_count_opt(), None);
2082 if let Statistics::Int32(stats) = stats {
2083 assert_eq!(stats.min_opt().unwrap(), &1);
2084 assert_eq!(stats.max_opt().unwrap(), &4);
2085 } else {
2086 panic!("expecting Statistics::Int32");
2087 }
2088 } else {
2089 panic!("metadata missing statistics");
2090 }
2091 }
2092
2093 #[test]
2094 fn test_column_writer_check_byte_array_min_max() {
2095 let page_writer = get_test_page_writer();
2096 let props = Default::default();
2097 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2098 writer
2099 .write_batch(
2100 &[
2101 ByteArray::from(vec![
2102 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2103 35u8, 231u8, 90u8, 0u8, 0u8,
2104 ]),
2105 ByteArray::from(vec![
2106 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2107 152u8, 177u8, 56u8, 0u8, 0u8,
2108 ]),
2109 ByteArray::from(vec![
2110 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2111 0u8,
2112 ]),
2113 ByteArray::from(vec![
2114 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2115 44u8, 0u8, 0u8,
2116 ]),
2117 ],
2118 None,
2119 None,
2120 )
2121 .unwrap();
2122 let metadata = writer.close().unwrap().metadata;
2123 if let Some(stats) = metadata.statistics() {
2124 if let Statistics::ByteArray(stats) = stats {
2125 assert_eq!(
2126 stats.min_opt().unwrap(),
2127 &ByteArray::from(vec![
2128 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2129 35u8, 231u8, 90u8, 0u8, 0u8,
2130 ])
2131 );
2132 assert_eq!(
2133 stats.max_opt().unwrap(),
2134 &ByteArray::from(vec![
2135 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2136 44u8, 0u8, 0u8,
2137 ])
2138 );
2139 } else {
2140 panic!("expecting Statistics::ByteArray");
2141 }
2142 } else {
2143 panic!("metadata missing statistics");
2144 }
2145 }
2146
2147 #[test]
2148 fn test_column_writer_uint32_converted_type_min_max() {
2149 let page_writer = get_test_page_writer();
2150 let props = Default::default();
2151 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2152 page_writer,
2153 0,
2154 0,
2155 props,
2156 );
2157 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2158 let metadata = writer.close().unwrap().metadata;
2159 if let Some(stats) = metadata.statistics() {
2160 if let Statistics::Int32(stats) = stats {
2161 assert_eq!(stats.min_opt().unwrap(), &0,);
2162 assert_eq!(stats.max_opt().unwrap(), &5,);
2163 } else {
2164 panic!("expecting Statistics::Int32");
2165 }
2166 } else {
2167 panic!("metadata missing statistics");
2168 }
2169 }
2170
2171 #[test]
2172 fn test_column_writer_precalculated_statistics() {
2173 let page_writer = get_test_page_writer();
2174 let props = Arc::new(
2175 WriterProperties::builder()
2176 .set_statistics_enabled(EnabledStatistics::Chunk)
2177 .build(),
2178 );
2179 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2180 writer
2181 .write_batch_with_statistics(
2182 &[1, 2, 3, 4],
2183 None,
2184 None,
2185 Some(&-17),
2186 Some(&9000),
2187 Some(55),
2188 )
2189 .unwrap();
2190
2191 let r = writer.close().unwrap();
2192 assert_eq!(r.bytes_written, 20);
2193 assert_eq!(r.rows_written, 4);
2194
2195 let metadata = r.metadata;
2196 assert_eq!(
2197 metadata.encodings(),
2198 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2199 );
2200 assert_eq!(metadata.num_values(), 4);
2201 assert_eq!(metadata.compressed_size(), 20);
2202 assert_eq!(metadata.uncompressed_size(), 20);
2203 assert_eq!(metadata.data_page_offset(), 0);
2204 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2205 if let Some(stats) = metadata.statistics() {
2206 assert_eq!(stats.null_count_opt(), Some(0));
2207 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2208 if let Statistics::Int32(stats) = stats {
2209 assert_eq!(stats.min_opt().unwrap(), &-17);
2210 assert_eq!(stats.max_opt().unwrap(), &9000);
2211 } else {
2212 panic!("expecting Statistics::Int32");
2213 }
2214 } else {
2215 panic!("metadata missing statistics");
2216 }
2217 }
2218
2219 #[test]
2220 fn test_mixed_precomputed_statistics() {
2221 let mut buf = Vec::with_capacity(100);
2222 let mut write = TrackedWrite::new(&mut buf);
2223 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2224 let props = Arc::new(
2225 WriterProperties::builder()
2226 .set_write_page_header_statistics(true)
2227 .build(),
2228 );
2229 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2230
2231 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2232 writer
2233 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2234 .unwrap();
2235
2236 let r = writer.close().unwrap();
2237
2238 let stats = r.metadata.statistics().unwrap();
2239 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2240 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2241 assert_eq!(stats.null_count_opt(), Some(0));
2242 assert!(stats.distinct_count_opt().is_none());
2243
2244 drop(write);
2245
2246 let props = ReaderProperties::builder()
2247 .set_backward_compatible_lz4(false)
2248 .build();
2249 let reader = SerializedPageReader::new_with_properties(
2250 Arc::new(Bytes::from(buf)),
2251 &r.metadata,
2252 r.rows_written as usize,
2253 None,
2254 Arc::new(props),
2255 )
2256 .unwrap();
2257
2258 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2259 assert_eq!(pages.len(), 2);
2260
2261 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2262 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2263
2264 let page_statistics = pages[1].statistics().unwrap();
2265 assert_eq!(
2266 page_statistics.min_bytes_opt().unwrap(),
2267 1_i32.to_le_bytes()
2268 );
2269 assert_eq!(
2270 page_statistics.max_bytes_opt().unwrap(),
2271 7_i32.to_le_bytes()
2272 );
2273 assert_eq!(page_statistics.null_count_opt(), Some(0));
2274 assert!(page_statistics.distinct_count_opt().is_none());
2275 }
2276
2277 #[test]
2278 fn test_disabled_statistics() {
2279 let mut buf = Vec::with_capacity(100);
2280 let mut write = TrackedWrite::new(&mut buf);
2281 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2282 let props = WriterProperties::builder()
2283 .set_statistics_enabled(EnabledStatistics::None)
2284 .set_writer_version(WriterVersion::PARQUET_2_0)
2285 .build();
2286 let props = Arc::new(props);
2287
2288 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2289 writer
2290 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2291 .unwrap();
2292
2293 let r = writer.close().unwrap();
2294 assert!(r.metadata.statistics().is_none());
2295
2296 drop(write);
2297
2298 let props = ReaderProperties::builder()
2299 .set_backward_compatible_lz4(false)
2300 .build();
2301 let reader = SerializedPageReader::new_with_properties(
2302 Arc::new(Bytes::from(buf)),
2303 &r.metadata,
2304 r.rows_written as usize,
2305 None,
2306 Arc::new(props),
2307 )
2308 .unwrap();
2309
2310 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2311 assert_eq!(pages.len(), 2);
2312
2313 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2314 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2315
2316 match &pages[1] {
2317 Page::DataPageV2 {
2318 num_values,
2319 num_nulls,
2320 num_rows,
2321 statistics,
2322 ..
2323 } => {
2324 assert_eq!(*num_values, 6);
2325 assert_eq!(*num_nulls, 2);
2326 assert_eq!(*num_rows, 6);
2327 assert!(statistics.is_none());
2328 }
2329 _ => unreachable!(),
2330 }
2331 }
2332
2333 #[test]
2334 fn test_column_writer_empty_column_roundtrip() {
2335 let props = Default::default();
2336 column_roundtrip::<Int32Type>(props, &[], None, None);
2337 }
2338
2339 #[test]
2340 fn test_column_writer_non_nullable_values_roundtrip() {
2341 let props = Default::default();
2342 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2343 }
2344
2345 #[test]
2346 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2347 let props = Default::default();
2348 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2349 }
2350
2351 #[test]
2352 fn test_column_writer_nullable_repeated_values_roundtrip() {
2353 let props = Default::default();
2354 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2355 }
2356
2357 #[test]
2358 fn test_column_writer_dictionary_fallback_small_data_page() {
2359 let props = WriterProperties::builder()
2360 .set_dictionary_page_size_limit(32)
2361 .set_data_page_size_limit(32)
2362 .build();
2363 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2364 }
2365
2366 #[test]
2367 fn test_column_writer_small_write_batch_size() {
2368 for i in &[1usize, 2, 5, 10, 11, 1023] {
2369 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2370
2371 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2372 }
2373 }
2374
2375 #[test]
2376 fn test_column_writer_dictionary_disabled_v1() {
2377 let props = WriterProperties::builder()
2378 .set_writer_version(WriterVersion::PARQUET_1_0)
2379 .set_dictionary_enabled(false)
2380 .build();
2381 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2382 }
2383
2384 #[test]
2385 fn test_column_writer_dictionary_disabled_v2() {
2386 let props = WriterProperties::builder()
2387 .set_writer_version(WriterVersion::PARQUET_2_0)
2388 .set_dictionary_enabled(false)
2389 .build();
2390 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2391 }
2392
2393 #[test]
2394 fn test_column_writer_compression_v1() {
2395 let props = WriterProperties::builder()
2396 .set_writer_version(WriterVersion::PARQUET_1_0)
2397 .set_compression(Compression::SNAPPY)
2398 .build();
2399 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2400 }
2401
2402 #[test]
2403 fn test_column_writer_compression_v2() {
2404 let props = WriterProperties::builder()
2405 .set_writer_version(WriterVersion::PARQUET_2_0)
2406 .set_compression(Compression::SNAPPY)
2407 .build();
2408 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2409 }
2410
2411 #[test]
2412 fn test_column_writer_add_data_pages_with_dict() {
2413 let mut file = tempfile::tempfile().unwrap();
2416 let mut write = TrackedWrite::new(&mut file);
2417 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2418 let props = Arc::new(
2419 WriterProperties::builder()
2420 .set_data_page_size_limit(10)
2421 .set_write_batch_size(3) .build(),
2423 );
2424 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2425 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2426 writer.write_batch(data, None, None).unwrap();
2427 let r = writer.close().unwrap();
2428
2429 drop(write);
2430
2431 let props = ReaderProperties::builder()
2433 .set_backward_compatible_lz4(false)
2434 .build();
2435 let mut page_reader = Box::new(
2436 SerializedPageReader::new_with_properties(
2437 Arc::new(file),
2438 &r.metadata,
2439 r.rows_written as usize,
2440 None,
2441 Arc::new(props),
2442 )
2443 .unwrap(),
2444 );
2445 let mut res = Vec::new();
2446 while let Some(page) = page_reader.get_next_page().unwrap() {
2447 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2448 }
2449 assert_eq!(
2450 res,
2451 vec![
2452 (PageType::DICTIONARY_PAGE, 10, 40),
2453 (PageType::DATA_PAGE, 9, 10),
2454 (PageType::DATA_PAGE, 1, 3),
2455 ]
2456 );
2457 assert_eq!(
2458 r.metadata.page_encoding_stats(),
2459 Some(&vec![
2460 PageEncodingStats {
2461 page_type: PageType::DICTIONARY_PAGE,
2462 encoding: Encoding::PLAIN,
2463 count: 1
2464 },
2465 PageEncodingStats {
2466 page_type: PageType::DATA_PAGE,
2467 encoding: Encoding::RLE_DICTIONARY,
2468 count: 2,
2469 }
2470 ])
2471 );
2472 }
2473
2474 #[test]
2475 fn test_bool_statistics() {
2476 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2477 assert!(!stats.is_min_max_backwards_compatible());
2480 if let Statistics::Boolean(stats) = stats {
2481 assert_eq!(stats.min_opt().unwrap(), &false);
2482 assert_eq!(stats.max_opt().unwrap(), &true);
2483 } else {
2484 panic!("expecting Statistics::Boolean, got {stats:?}");
2485 }
2486 }
2487
2488 #[test]
2489 fn test_int32_statistics() {
2490 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2491 assert!(stats.is_min_max_backwards_compatible());
2492 if let Statistics::Int32(stats) = stats {
2493 assert_eq!(stats.min_opt().unwrap(), &-2);
2494 assert_eq!(stats.max_opt().unwrap(), &3);
2495 } else {
2496 panic!("expecting Statistics::Int32, got {stats:?}");
2497 }
2498 }
2499
2500 #[test]
2501 fn test_int64_statistics() {
2502 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2503 assert!(stats.is_min_max_backwards_compatible());
2504 if let Statistics::Int64(stats) = stats {
2505 assert_eq!(stats.min_opt().unwrap(), &-2);
2506 assert_eq!(stats.max_opt().unwrap(), &3);
2507 } else {
2508 panic!("expecting Statistics::Int64, got {stats:?}");
2509 }
2510 }
2511
2512 #[test]
2513 fn test_int96_statistics() {
2514 let input = vec![
2515 Int96::from(vec![1, 20, 30]),
2516 Int96::from(vec![3, 20, 10]),
2517 Int96::from(vec![0, 20, 30]),
2518 Int96::from(vec![2, 20, 30]),
2519 ]
2520 .into_iter()
2521 .collect::<Vec<Int96>>();
2522
2523 let stats = statistics_roundtrip::<Int96Type>(&input);
2524 assert!(!stats.is_min_max_backwards_compatible());
2525 if let Statistics::Int96(stats) = stats {
2526 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
2527 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2528 } else {
2529 panic!("expecting Statistics::Int96, got {stats:?}");
2530 }
2531 }
2532
2533 #[test]
2534 fn test_float_statistics() {
2535 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2536 assert!(stats.is_min_max_backwards_compatible());
2537 if let Statistics::Float(stats) = stats {
2538 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2539 assert_eq!(stats.max_opt().unwrap(), &3.0);
2540 } else {
2541 panic!("expecting Statistics::Float, got {stats:?}");
2542 }
2543 }
2544
2545 #[test]
2546 fn test_double_statistics() {
2547 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2548 assert!(stats.is_min_max_backwards_compatible());
2549 if let Statistics::Double(stats) = stats {
2550 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2551 assert_eq!(stats.max_opt().unwrap(), &3.0);
2552 } else {
2553 panic!("expecting Statistics::Double, got {stats:?}");
2554 }
2555 }
2556
2557 #[test]
2558 fn test_byte_array_statistics() {
2559 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2560 .iter()
2561 .map(|&s| s.into())
2562 .collect::<Vec<_>>();
2563
2564 let stats = statistics_roundtrip::<ByteArrayType>(&input);
2565 assert!(!stats.is_min_max_backwards_compatible());
2566 if let Statistics::ByteArray(stats) = stats {
2567 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2568 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2569 } else {
2570 panic!("expecting Statistics::ByteArray, got {stats:?}");
2571 }
2572 }
2573
2574 #[test]
2575 fn test_fixed_len_byte_array_statistics() {
2576 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
2577 .iter()
2578 .map(|&s| ByteArray::from(s).into())
2579 .collect::<Vec<_>>();
2580
2581 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2582 assert!(!stats.is_min_max_backwards_compatible());
2583 if let Statistics::FixedLenByteArray(stats) = stats {
2584 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
2585 assert_eq!(stats.min_opt().unwrap(), &expected_min);
2586 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
2587 assert_eq!(stats.max_opt().unwrap(), &expected_max);
2588 } else {
2589 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2590 }
2591 }
2592
2593 #[test]
2594 fn test_column_writer_check_float16_min_max() {
2595 let input = [
2596 -f16::ONE,
2597 f16::from_f32(3.0),
2598 -f16::from_f32(2.0),
2599 f16::from_f32(2.0),
2600 ]
2601 .into_iter()
2602 .map(|s| ByteArray::from(s).into())
2603 .collect::<Vec<_>>();
2604
2605 let stats = float16_statistics_roundtrip(&input);
2606 assert!(stats.is_min_max_backwards_compatible());
2607 assert_eq!(
2608 stats.min_opt().unwrap(),
2609 &ByteArray::from(-f16::from_f32(2.0))
2610 );
2611 assert_eq!(
2612 stats.max_opt().unwrap(),
2613 &ByteArray::from(f16::from_f32(3.0))
2614 );
2615 }
2616
2617 #[test]
2618 fn test_column_writer_check_float16_nan_middle() {
2619 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2620 .into_iter()
2621 .map(|s| ByteArray::from(s).into())
2622 .collect::<Vec<_>>();
2623
2624 let stats = float16_statistics_roundtrip(&input);
2625 assert!(stats.is_min_max_backwards_compatible());
2626 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2627 assert_eq!(
2628 stats.max_opt().unwrap(),
2629 &ByteArray::from(f16::ONE + f16::ONE)
2630 );
2631 }
2632
2633 #[test]
2634 fn test_float16_statistics_nan_middle() {
2635 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2636 .into_iter()
2637 .map(|s| ByteArray::from(s).into())
2638 .collect::<Vec<_>>();
2639
2640 let stats = float16_statistics_roundtrip(&input);
2641 assert!(stats.is_min_max_backwards_compatible());
2642 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2643 assert_eq!(
2644 stats.max_opt().unwrap(),
2645 &ByteArray::from(f16::ONE + f16::ONE)
2646 );
2647 }
2648
2649 #[test]
2650 fn test_float16_statistics_nan_start() {
2651 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2652 .into_iter()
2653 .map(|s| ByteArray::from(s).into())
2654 .collect::<Vec<_>>();
2655
2656 let stats = float16_statistics_roundtrip(&input);
2657 assert!(stats.is_min_max_backwards_compatible());
2658 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2659 assert_eq!(
2660 stats.max_opt().unwrap(),
2661 &ByteArray::from(f16::ONE + f16::ONE)
2662 );
2663 }
2664
2665 #[test]
2666 fn test_float16_statistics_nan_only() {
2667 let input = [f16::NAN, f16::NAN]
2668 .into_iter()
2669 .map(|s| ByteArray::from(s).into())
2670 .collect::<Vec<_>>();
2671
2672 let stats = float16_statistics_roundtrip(&input);
2673 assert!(stats.min_bytes_opt().is_none());
2674 assert!(stats.max_bytes_opt().is_none());
2675 assert!(stats.is_min_max_backwards_compatible());
2676 }
2677
2678 #[test]
2679 fn test_float16_statistics_zero_only() {
2680 let input = [f16::ZERO]
2681 .into_iter()
2682 .map(|s| ByteArray::from(s).into())
2683 .collect::<Vec<_>>();
2684
2685 let stats = float16_statistics_roundtrip(&input);
2686 assert!(stats.is_min_max_backwards_compatible());
2687 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2688 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2689 }
2690
2691 #[test]
2692 fn test_float16_statistics_neg_zero_only() {
2693 let input = [f16::NEG_ZERO]
2694 .into_iter()
2695 .map(|s| ByteArray::from(s).into())
2696 .collect::<Vec<_>>();
2697
2698 let stats = float16_statistics_roundtrip(&input);
2699 assert!(stats.is_min_max_backwards_compatible());
2700 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2701 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2702 }
2703
2704 #[test]
2705 fn test_float16_statistics_zero_min() {
2706 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2707 .into_iter()
2708 .map(|s| ByteArray::from(s).into())
2709 .collect::<Vec<_>>();
2710
2711 let stats = float16_statistics_roundtrip(&input);
2712 assert!(stats.is_min_max_backwards_compatible());
2713 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2714 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2715 }
2716
2717 #[test]
2718 fn test_float16_statistics_neg_zero_max() {
2719 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2720 .into_iter()
2721 .map(|s| ByteArray::from(s).into())
2722 .collect::<Vec<_>>();
2723
2724 let stats = float16_statistics_roundtrip(&input);
2725 assert!(stats.is_min_max_backwards_compatible());
2726 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2727 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2728 }
2729
2730 #[test]
2731 fn test_float_statistics_nan_middle() {
2732 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2733 assert!(stats.is_min_max_backwards_compatible());
2734 if let Statistics::Float(stats) = stats {
2735 assert_eq!(stats.min_opt().unwrap(), &1.0);
2736 assert_eq!(stats.max_opt().unwrap(), &2.0);
2737 } else {
2738 panic!("expecting Statistics::Float");
2739 }
2740 }
2741
2742 #[test]
2743 fn test_float_statistics_nan_start() {
2744 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2745 assert!(stats.is_min_max_backwards_compatible());
2746 if let Statistics::Float(stats) = stats {
2747 assert_eq!(stats.min_opt().unwrap(), &1.0);
2748 assert_eq!(stats.max_opt().unwrap(), &2.0);
2749 } else {
2750 panic!("expecting Statistics::Float");
2751 }
2752 }
2753
2754 #[test]
2755 fn test_float_statistics_nan_only() {
2756 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2757 assert!(stats.min_bytes_opt().is_none());
2758 assert!(stats.max_bytes_opt().is_none());
2759 assert!(stats.is_min_max_backwards_compatible());
2760 assert!(matches!(stats, Statistics::Float(_)));
2761 }
2762
2763 #[test]
2764 fn test_float_statistics_zero_only() {
2765 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2766 assert!(stats.is_min_max_backwards_compatible());
2767 if let Statistics::Float(stats) = stats {
2768 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2769 assert!(stats.min_opt().unwrap().is_sign_negative());
2770 assert_eq!(stats.max_opt().unwrap(), &0.0);
2771 assert!(stats.max_opt().unwrap().is_sign_positive());
2772 } else {
2773 panic!("expecting Statistics::Float");
2774 }
2775 }
2776
2777 #[test]
2778 fn test_float_statistics_neg_zero_only() {
2779 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2780 assert!(stats.is_min_max_backwards_compatible());
2781 if let Statistics::Float(stats) = stats {
2782 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2783 assert!(stats.min_opt().unwrap().is_sign_negative());
2784 assert_eq!(stats.max_opt().unwrap(), &0.0);
2785 assert!(stats.max_opt().unwrap().is_sign_positive());
2786 } else {
2787 panic!("expecting Statistics::Float");
2788 }
2789 }
2790
2791 #[test]
2792 fn test_float_statistics_zero_min() {
2793 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2794 assert!(stats.is_min_max_backwards_compatible());
2795 if let Statistics::Float(stats) = stats {
2796 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2797 assert!(stats.min_opt().unwrap().is_sign_negative());
2798 assert_eq!(stats.max_opt().unwrap(), &2.0);
2799 } else {
2800 panic!("expecting Statistics::Float");
2801 }
2802 }
2803
2804 #[test]
2805 fn test_float_statistics_neg_zero_max() {
2806 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2807 assert!(stats.is_min_max_backwards_compatible());
2808 if let Statistics::Float(stats) = stats {
2809 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2810 assert_eq!(stats.max_opt().unwrap(), &0.0);
2811 assert!(stats.max_opt().unwrap().is_sign_positive());
2812 } else {
2813 panic!("expecting Statistics::Float");
2814 }
2815 }
2816
2817 #[test]
2818 fn test_double_statistics_nan_middle() {
2819 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2820 assert!(stats.is_min_max_backwards_compatible());
2821 if let Statistics::Double(stats) = stats {
2822 assert_eq!(stats.min_opt().unwrap(), &1.0);
2823 assert_eq!(stats.max_opt().unwrap(), &2.0);
2824 } else {
2825 panic!("expecting Statistics::Double");
2826 }
2827 }
2828
2829 #[test]
2830 fn test_double_statistics_nan_start() {
2831 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2832 assert!(stats.is_min_max_backwards_compatible());
2833 if let Statistics::Double(stats) = stats {
2834 assert_eq!(stats.min_opt().unwrap(), &1.0);
2835 assert_eq!(stats.max_opt().unwrap(), &2.0);
2836 } else {
2837 panic!("expecting Statistics::Double");
2838 }
2839 }
2840
2841 #[test]
2842 fn test_double_statistics_nan_only() {
2843 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2844 assert!(stats.min_bytes_opt().is_none());
2845 assert!(stats.max_bytes_opt().is_none());
2846 assert!(matches!(stats, Statistics::Double(_)));
2847 assert!(stats.is_min_max_backwards_compatible());
2848 }
2849
2850 #[test]
2851 fn test_double_statistics_zero_only() {
2852 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2853 assert!(stats.is_min_max_backwards_compatible());
2854 if let Statistics::Double(stats) = stats {
2855 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2856 assert!(stats.min_opt().unwrap().is_sign_negative());
2857 assert_eq!(stats.max_opt().unwrap(), &0.0);
2858 assert!(stats.max_opt().unwrap().is_sign_positive());
2859 } else {
2860 panic!("expecting Statistics::Double");
2861 }
2862 }
2863
2864 #[test]
2865 fn test_double_statistics_neg_zero_only() {
2866 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2867 assert!(stats.is_min_max_backwards_compatible());
2868 if let Statistics::Double(stats) = stats {
2869 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2870 assert!(stats.min_opt().unwrap().is_sign_negative());
2871 assert_eq!(stats.max_opt().unwrap(), &0.0);
2872 assert!(stats.max_opt().unwrap().is_sign_positive());
2873 } else {
2874 panic!("expecting Statistics::Double");
2875 }
2876 }
2877
2878 #[test]
2879 fn test_double_statistics_zero_min() {
2880 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2881 assert!(stats.is_min_max_backwards_compatible());
2882 if let Statistics::Double(stats) = stats {
2883 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2884 assert!(stats.min_opt().unwrap().is_sign_negative());
2885 assert_eq!(stats.max_opt().unwrap(), &2.0);
2886 } else {
2887 panic!("expecting Statistics::Double");
2888 }
2889 }
2890
2891 #[test]
2892 fn test_double_statistics_neg_zero_max() {
2893 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2894 assert!(stats.is_min_max_backwards_compatible());
2895 if let Statistics::Double(stats) = stats {
2896 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2897 assert_eq!(stats.max_opt().unwrap(), &0.0);
2898 assert!(stats.max_opt().unwrap().is_sign_positive());
2899 } else {
2900 panic!("expecting Statistics::Double");
2901 }
2902 }
2903
2904 #[test]
2905 fn test_compare_greater_byte_array_decimals() {
2906 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2907 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2908 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2909 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2910 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2911 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2912 assert!(!compare_greater_byte_array_decimals(
2913 &[0u8, 1u8,],
2914 &[1u8, 0u8,],
2915 ),);
2916 assert!(!compare_greater_byte_array_decimals(
2917 &[255u8, 35u8, 0u8, 0u8,],
2918 &[0u8,],
2919 ),);
2920 assert!(compare_greater_byte_array_decimals(
2921 &[0u8,],
2922 &[255u8, 35u8, 0u8, 0u8,],
2923 ),);
2924 }
2925
2926 #[test]
2927 fn test_column_index_with_null_pages() {
2928 let page_writer = get_test_page_writer();
2930 let props = Default::default();
2931 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2932 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2933
2934 let r = writer.close().unwrap();
2935 assert!(r.column_index.is_some());
2936 let col_idx = r.column_index.unwrap();
2937 assert!(col_idx.null_pages[0]);
2939 assert_eq!(col_idx.min_values[0].len(), 0);
2941 assert_eq!(col_idx.max_values[0].len(), 0);
2942 assert!(col_idx.null_counts.is_some());
2944 assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2945 assert!(col_idx.repetition_level_histograms.is_none());
2947 assert!(col_idx.definition_level_histograms.is_some());
2949 assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2950 }
2951
2952 #[test]
2953 fn test_column_offset_index_metadata() {
2954 let page_writer = get_test_page_writer();
2957 let props = Default::default();
2958 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2959 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2960 writer.flush_data_pages().unwrap();
2962 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2964
2965 let r = writer.close().unwrap();
2966 let column_index = r.column_index.unwrap();
2967 let offset_index = r.offset_index.unwrap();
2968
2969 assert_eq!(8, r.rows_written);
2970
2971 assert_eq!(2, column_index.null_pages.len());
2973 assert_eq!(2, offset_index.page_locations.len());
2974 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2975 for idx in 0..2 {
2976 assert!(!column_index.null_pages[idx]);
2977 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2978 }
2979
2980 if let Some(stats) = r.metadata.statistics() {
2981 assert_eq!(stats.null_count_opt(), Some(0));
2982 assert_eq!(stats.distinct_count_opt(), None);
2983 if let Statistics::Int32(stats) = stats {
2984 assert_eq!(
2988 stats.min_bytes_opt(),
2989 Some(column_index.min_values[1].as_slice())
2990 );
2991 assert_eq!(
2992 stats.max_bytes_opt(),
2993 column_index.max_values.get(1).map(Vec::as_slice)
2994 );
2995 } else {
2996 panic!("expecting Statistics::Int32");
2997 }
2998 } else {
2999 panic!("metadata missing statistics");
3000 }
3001
3002 assert_eq!(0, offset_index.page_locations[0].first_row_index);
3004 assert_eq!(4, offset_index.page_locations[1].first_row_index);
3005 }
3006
3007 #[test]
3009 fn test_column_offset_index_metadata_truncating() {
3010 let page_writer = get_test_page_writer();
3013 let props = WriterProperties::builder()
3014 .set_statistics_truncate_length(None) .build()
3016 .into();
3017 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3018
3019 let mut data = vec![FixedLenByteArray::default(); 3];
3020 data[0].set_data(Bytes::from(vec![97_u8; 200]));
3022 data[1].set_data(Bytes::from(vec![112_u8; 200]));
3024 data[2].set_data(Bytes::from(vec![98_u8; 200]));
3025
3026 writer.write_batch(&data, None, None).unwrap();
3027
3028 writer.flush_data_pages().unwrap();
3029
3030 let r = writer.close().unwrap();
3031 let column_index = r.column_index.unwrap();
3032 let offset_index = r.offset_index.unwrap();
3033
3034 assert_eq!(3, r.rows_written);
3035
3036 assert_eq!(1, column_index.null_pages.len());
3038 assert_eq!(1, offset_index.page_locations.len());
3039 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3040 assert!(!column_index.null_pages[0]);
3041 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3042
3043 if let Some(stats) = r.metadata.statistics() {
3044 assert_eq!(stats.null_count_opt(), Some(0));
3045 assert_eq!(stats.distinct_count_opt(), None);
3046 if let Statistics::FixedLenByteArray(stats) = stats {
3047 let column_index_min_value = &column_index.min_values[0];
3048 let column_index_max_value = &column_index.max_values[0];
3049
3050 assert_ne!(
3052 stats.min_bytes_opt(),
3053 Some(column_index_min_value.as_slice())
3054 );
3055 assert_ne!(
3056 stats.max_bytes_opt(),
3057 Some(column_index_max_value.as_slice())
3058 );
3059
3060 assert_eq!(
3061 column_index_min_value.len(),
3062 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3063 );
3064 assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
3065 assert_eq!(
3066 column_index_max_value.len(),
3067 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3068 );
3069
3070 assert_eq!(
3072 *column_index_max_value.last().unwrap(),
3073 *column_index_max_value.first().unwrap() + 1
3074 );
3075 } else {
3076 panic!("expecting Statistics::FixedLenByteArray");
3077 }
3078 } else {
3079 panic!("metadata missing statistics");
3080 }
3081 }
3082
3083 #[test]
3084 fn test_column_offset_index_truncating_spec_example() {
3085 let page_writer = get_test_page_writer();
3088
3089 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3091 let props = Arc::new(builder.build());
3092 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3093
3094 let mut data = vec![FixedLenByteArray::default(); 1];
3095 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3097
3098 writer.write_batch(&data, None, None).unwrap();
3099
3100 writer.flush_data_pages().unwrap();
3101
3102 let r = writer.close().unwrap();
3103 let column_index = r.column_index.unwrap();
3104 let offset_index = r.offset_index.unwrap();
3105
3106 assert_eq!(1, r.rows_written);
3107
3108 assert_eq!(1, column_index.null_pages.len());
3110 assert_eq!(1, offset_index.page_locations.len());
3111 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3112 assert!(!column_index.null_pages[0]);
3113 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3114
3115 if let Some(stats) = r.metadata.statistics() {
3116 assert_eq!(stats.null_count_opt(), Some(0));
3117 assert_eq!(stats.distinct_count_opt(), None);
3118 if let Statistics::FixedLenByteArray(_stats) = stats {
3119 let column_index_min_value = &column_index.min_values[0];
3120 let column_index_max_value = &column_index.max_values[0];
3121
3122 assert_eq!(column_index_min_value.len(), 1);
3123 assert_eq!(column_index_max_value.len(), 1);
3124
3125 assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
3126 assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
3127
3128 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3129 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3130 } else {
3131 panic!("expecting Statistics::FixedLenByteArray");
3132 }
3133 } else {
3134 panic!("metadata missing statistics");
3135 }
3136 }
3137
3138 #[test]
3139 fn test_float16_min_max_no_truncation() {
3140 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3142 let props = Arc::new(builder.build());
3143 let page_writer = get_test_page_writer();
3144 let mut writer = get_test_float16_column_writer(page_writer, props);
3145
3146 let expected_value = f16::PI.to_le_bytes().to_vec();
3147 let data = vec![ByteArray::from(expected_value.clone()).into()];
3148 writer.write_batch(&data, None, None).unwrap();
3149 writer.flush_data_pages().unwrap();
3150
3151 let r = writer.close().unwrap();
3152
3153 let column_index = r.column_index.unwrap();
3156 let column_index_min_bytes = column_index.min_values[0].as_slice();
3157 let column_index_max_bytes = column_index.max_values[0].as_slice();
3158 assert_eq!(expected_value, column_index_min_bytes);
3159 assert_eq!(expected_value, column_index_max_bytes);
3160
3161 let stats = r.metadata.statistics().unwrap();
3163 if let Statistics::FixedLenByteArray(stats) = stats {
3164 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3165 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3166 assert_eq!(expected_value, stats_min_bytes);
3167 assert_eq!(expected_value, stats_max_bytes);
3168 } else {
3169 panic!("expecting Statistics::FixedLenByteArray");
3170 }
3171 }
3172
3173 #[test]
3174 fn test_decimal_min_max_no_truncation() {
3175 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3177 let props = Arc::new(builder.build());
3178 let page_writer = get_test_page_writer();
3179 let mut writer =
3180 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3181
3182 let expected_value = vec![
3183 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3184 231u8, 90u8, 0u8, 0u8,
3185 ];
3186 let data = vec![ByteArray::from(expected_value.clone()).into()];
3187 writer.write_batch(&data, None, None).unwrap();
3188 writer.flush_data_pages().unwrap();
3189
3190 let r = writer.close().unwrap();
3191
3192 let column_index = r.column_index.unwrap();
3195 let column_index_min_bytes = column_index.min_values[0].as_slice();
3196 let column_index_max_bytes = column_index.max_values[0].as_slice();
3197 assert_eq!(expected_value, column_index_min_bytes);
3198 assert_eq!(expected_value, column_index_max_bytes);
3199
3200 let stats = r.metadata.statistics().unwrap();
3202 if let Statistics::FixedLenByteArray(stats) = stats {
3203 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3204 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3205 assert_eq!(expected_value, stats_min_bytes);
3206 assert_eq!(expected_value, stats_max_bytes);
3207 } else {
3208 panic!("expecting Statistics::FixedLenByteArray");
3209 }
3210 }
3211
3212 #[test]
3213 fn test_statistics_truncating_byte_array_default() {
3214 let page_writer = get_test_page_writer();
3215
3216 let props = WriterProperties::builder().build().into();
3218 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3219
3220 let mut data = vec![ByteArray::default(); 1];
3221 data[0].set_data(Bytes::from(String::from(
3222 "This string is longer than 64 bytes, so it will almost certainly be truncated.",
3223 )));
3224 writer.write_batch(&data, None, None).unwrap();
3225 writer.flush_data_pages().unwrap();
3226
3227 let r = writer.close().unwrap();
3228
3229 assert_eq!(1, r.rows_written);
3230
3231 let stats = r.metadata.statistics().expect("statistics");
3232 if let Statistics::ByteArray(_stats) = stats {
3233 let min_value = _stats.min_opt().unwrap();
3234 let max_value = _stats.max_opt().unwrap();
3235
3236 assert!(!_stats.min_is_exact());
3237 assert!(!_stats.max_is_exact());
3238
3239 let expected_len = 64;
3240 assert_eq!(min_value.len(), expected_len);
3241 assert_eq!(max_value.len(), expected_len);
3242
3243 let expected_min =
3244 "This string is longer than 64 bytes, so it will almost certainly".as_bytes();
3245 assert_eq!(expected_min, min_value.as_bytes());
3246 let expected_max =
3248 "This string is longer than 64 bytes, so it will almost certainlz".as_bytes();
3249 assert_eq!(expected_max, max_value.as_bytes());
3250 } else {
3251 panic!("expecting Statistics::ByteArray");
3252 }
3253 }
3254
3255 #[test]
3256 fn test_statistics_truncating_byte_array() {
3257 let page_writer = get_test_page_writer();
3258
3259 const TEST_TRUNCATE_LENGTH: usize = 1;
3260
3261 let builder =
3263 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3264 let props = Arc::new(builder.build());
3265 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3266
3267 let mut data = vec![ByteArray::default(); 1];
3268 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3270
3271 writer.write_batch(&data, None, None).unwrap();
3272
3273 writer.flush_data_pages().unwrap();
3274
3275 let r = writer.close().unwrap();
3276
3277 assert_eq!(1, r.rows_written);
3278
3279 let stats = r.metadata.statistics().expect("statistics");
3280 assert_eq!(stats.null_count_opt(), Some(0));
3281 assert_eq!(stats.distinct_count_opt(), None);
3282 if let Statistics::ByteArray(_stats) = stats {
3283 let min_value = _stats.min_opt().unwrap();
3284 let max_value = _stats.max_opt().unwrap();
3285
3286 assert!(!_stats.min_is_exact());
3287 assert!(!_stats.max_is_exact());
3288
3289 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3290 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3291
3292 assert_eq!("B".as_bytes(), min_value.as_bytes());
3293 assert_eq!("C".as_bytes(), max_value.as_bytes());
3294 } else {
3295 panic!("expecting Statistics::ByteArray");
3296 }
3297 }
3298
3299 #[test]
3300 fn test_statistics_truncating_fixed_len_byte_array() {
3301 let page_writer = get_test_page_writer();
3302
3303 const TEST_TRUNCATE_LENGTH: usize = 1;
3304
3305 let builder =
3307 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3308 let props = Arc::new(builder.build());
3309 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3310
3311 let mut data = vec![FixedLenByteArray::default(); 1];
3312
3313 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3314 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3315
3316 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3318 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3319
3320 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3322
3323 writer.write_batch(&data, None, None).unwrap();
3324
3325 writer.flush_data_pages().unwrap();
3326
3327 let r = writer.close().unwrap();
3328
3329 assert_eq!(1, r.rows_written);
3330
3331 let stats = r.metadata.statistics().expect("statistics");
3332 assert_eq!(stats.null_count_opt(), Some(0));
3333 assert_eq!(stats.distinct_count_opt(), None);
3334 if let Statistics::FixedLenByteArray(_stats) = stats {
3335 let min_value = _stats.min_opt().unwrap();
3336 let max_value = _stats.max_opt().unwrap();
3337
3338 assert!(!_stats.min_is_exact());
3339 assert!(!_stats.max_is_exact());
3340
3341 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3342 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3343
3344 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3345 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3346
3347 let reconstructed_min = i128::from_be_bytes([
3348 min_value.as_bytes()[0],
3349 0,
3350 0,
3351 0,
3352 0,
3353 0,
3354 0,
3355 0,
3356 0,
3357 0,
3358 0,
3359 0,
3360 0,
3361 0,
3362 0,
3363 0,
3364 ]);
3365
3366 let reconstructed_max = i128::from_be_bytes([
3367 max_value.as_bytes()[0],
3368 0,
3369 0,
3370 0,
3371 0,
3372 0,
3373 0,
3374 0,
3375 0,
3376 0,
3377 0,
3378 0,
3379 0,
3380 0,
3381 0,
3382 0,
3383 ]);
3384
3385 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3387 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3388 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3389 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3390 } else {
3391 panic!("expecting Statistics::FixedLenByteArray");
3392 }
3393 }
3394
3395 #[test]
3396 fn test_send() {
3397 fn test<T: Send>() {}
3398 test::<ColumnWriterImpl<Int32Type>>();
3399 }
3400
3401 #[test]
3402 fn test_increment() {
3403 let v = increment(vec![0, 0, 0]).unwrap();
3404 assert_eq!(&v, &[0, 0, 1]);
3405
3406 let v = increment(vec![0, 255, 255]).unwrap();
3408 assert_eq!(&v, &[1, 0, 0]);
3409
3410 let v = increment(vec![255, 255, 255]);
3412 assert!(v.is_none());
3413 }
3414
3415 #[test]
3416 fn test_increment_utf8() {
3417 let test_inc = |o: &str, expected: &str| {
3418 if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3419 assert_eq!(v, expected);
3421 assert!(*v > *o);
3423 let mut greater = ByteArray::new();
3425 greater.set_data(Bytes::from(v));
3426 let mut original = ByteArray::new();
3427 original.set_data(Bytes::from(o.as_bytes().to_vec()));
3428 assert!(greater > original);
3429 } else {
3430 panic!("Expected incremented UTF8 string to also be valid.");
3431 }
3432 };
3433
3434 test_inc("hello", "hellp");
3436
3437 test_inc("a\u{7f}", "b");
3439
3440 assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3442
3443 test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3445
3446 test_inc("éééé", "éééê");
3448
3449 test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3451
3452 test_inc("a\u{7ff}", "b");
3454
3455 assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3457
3458 test_inc("ࠀࠀ", "ࠀࠁ");
3461
3462 test_inc("a\u{ffff}", "b");
3464
3465 assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3467
3468 test_inc("𐀀𐀀", "𐀀𐀁");
3470
3471 test_inc("a\u{10ffff}", "b");
3473
3474 assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3476
3477 test_inc("a\u{D7FF}", "b");
3480 }
3481
3482 #[test]
3483 fn test_truncate_utf8() {
3484 let data = "❤️🧡💛💚💙💜";
3486 let r = truncate_utf8(data, data.len()).unwrap();
3487 assert_eq!(r.len(), data.len());
3488 assert_eq!(&r, data.as_bytes());
3489
3490 let r = truncate_utf8(data, 13).unwrap();
3492 assert_eq!(r.len(), 10);
3493 assert_eq!(&r, "❤️🧡".as_bytes());
3494
3495 let r = truncate_utf8("\u{0836}", 1);
3497 assert!(r.is_none());
3498
3499 let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3502 assert_eq!(&r, "yyyyyyyz".as_bytes());
3503
3504 let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3506 assert_eq!(&r, "ééê".as_bytes());
3507
3508 let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3510 assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3511
3512 let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3514 assert!(r.is_none());
3515
3516 let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3519 assert_eq!(&r, "ࠀࠁ".as_bytes());
3520
3521 let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3523 assert!(r.is_none());
3524
3525 let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3527 assert_eq!(&r, "𐀀𐀁".as_bytes());
3528
3529 let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3531 assert!(r.is_none());
3532 }
3533
3534 #[test]
3535 fn test_byte_array_truncate_invalid_utf8_statistics() {
3538 let message_type = "
3539 message test_schema {
3540 OPTIONAL BYTE_ARRAY a (UTF8);
3541 }
3542 ";
3543 let schema = Arc::new(parse_message_type(message_type).unwrap());
3544
3545 let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3547 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3548 let file: File = tempfile::tempfile().unwrap();
3549 let props = Arc::new(
3550 WriterProperties::builder()
3551 .set_statistics_enabled(EnabledStatistics::Chunk)
3552 .set_statistics_truncate_length(Some(8))
3553 .build(),
3554 );
3555
3556 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3557 let mut row_group_writer = writer.next_row_group().unwrap();
3558
3559 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3560 col_writer
3561 .typed::<ByteArrayType>()
3562 .write_batch(&data, Some(&def_levels), None)
3563 .unwrap();
3564 col_writer.close().unwrap();
3565 row_group_writer.close().unwrap();
3566 let file_metadata = writer.close().unwrap();
3567 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
3568 let stats = file_metadata.row_groups[0].columns[0]
3569 .meta_data
3570 .as_ref()
3571 .unwrap()
3572 .statistics
3573 .as_ref()
3574 .unwrap();
3575 assert!(!stats.is_max_value_exact.unwrap());
3576 assert_eq!(
3579 stats.max_value,
3580 Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3581 );
3582 }
3583
3584 #[test]
3585 fn test_increment_max_binary_chars() {
3586 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3587 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3588
3589 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3590 assert!(incremented.is_none())
3591 }
3592
3593 #[test]
3594 fn test_no_column_index_when_stats_disabled() {
3595 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3599 let props = Arc::new(
3600 WriterProperties::builder()
3601 .set_statistics_enabled(EnabledStatistics::None)
3602 .build(),
3603 );
3604 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3605 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3606
3607 let data = Vec::new();
3608 let def_levels = vec![0; 10];
3609 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3610 writer.flush_data_pages().unwrap();
3611
3612 let column_close_result = writer.close().unwrap();
3613 assert!(column_close_result.offset_index.is_some());
3614 assert!(column_close_result.column_index.is_none());
3615 }
3616
3617 #[test]
3618 fn test_no_offset_index_when_disabled() {
3619 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3621 let props = Arc::new(
3622 WriterProperties::builder()
3623 .set_statistics_enabled(EnabledStatistics::None)
3624 .set_offset_index_disabled(true)
3625 .build(),
3626 );
3627 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3628 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3629
3630 let data = Vec::new();
3631 let def_levels = vec![0; 10];
3632 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3633 writer.flush_data_pages().unwrap();
3634
3635 let column_close_result = writer.close().unwrap();
3636 assert!(column_close_result.offset_index.is_none());
3637 assert!(column_close_result.column_index.is_none());
3638 }
3639
3640 #[test]
3641 fn test_offset_index_overridden() {
3642 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3644 let props = Arc::new(
3645 WriterProperties::builder()
3646 .set_statistics_enabled(EnabledStatistics::Page)
3647 .set_offset_index_disabled(true)
3648 .build(),
3649 );
3650 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3651 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3652
3653 let data = Vec::new();
3654 let def_levels = vec![0; 10];
3655 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3656 writer.flush_data_pages().unwrap();
3657
3658 let column_close_result = writer.close().unwrap();
3659 assert!(column_close_result.offset_index.is_some());
3660 assert!(column_close_result.column_index.is_some());
3661 }
3662
3663 #[test]
3664 fn test_boundary_order() -> Result<()> {
3665 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3666 let column_close_result = write_multiple_pages::<Int32Type>(
3668 &descr,
3669 &[
3670 &[Some(-10), Some(10)],
3671 &[Some(-5), Some(11)],
3672 &[None],
3673 &[Some(-5), Some(11)],
3674 ],
3675 )?;
3676 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3677 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3678
3679 let column_close_result = write_multiple_pages::<Int32Type>(
3681 &descr,
3682 &[
3683 &[Some(10), Some(11)],
3684 &[Some(5), Some(11)],
3685 &[None],
3686 &[Some(-5), Some(0)],
3687 ],
3688 )?;
3689 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3690 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3691
3692 let column_close_result = write_multiple_pages::<Int32Type>(
3694 &descr,
3695 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3696 )?;
3697 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3698 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3699
3700 let column_close_result =
3702 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3703 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3704 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3705
3706 let column_close_result =
3708 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3709 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3710 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3711
3712 let column_close_result =
3714 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3715 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3716 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3717
3718 let column_close_result = write_multiple_pages::<Int32Type>(
3720 &descr,
3721 &[
3722 &[Some(10), Some(11)],
3723 &[Some(11), Some(16)],
3724 &[None],
3725 &[Some(-5), Some(0)],
3726 ],
3727 )?;
3728 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3729 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3730
3731 let column_close_result = write_multiple_pages::<Int32Type>(
3733 &descr,
3734 &[
3735 &[Some(1), Some(9)],
3736 &[Some(2), Some(8)],
3737 &[None],
3738 &[Some(3), Some(7)],
3739 ],
3740 )?;
3741 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3742 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3743
3744 Ok(())
3745 }
3746
3747 #[test]
3748 fn test_boundary_order_logical_type() -> Result<()> {
3749 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3752 let fba_descr = {
3753 let tpe = SchemaType::primitive_type_builder(
3754 "col",
3755 FixedLenByteArrayType::get_physical_type(),
3756 )
3757 .with_length(2)
3758 .build()?;
3759 Arc::new(ColumnDescriptor::new(
3760 Arc::new(tpe),
3761 1,
3762 0,
3763 ColumnPath::from("col"),
3764 ))
3765 };
3766
3767 let values: &[&[Option<FixedLenByteArray>]] = &[
3768 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3769 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3770 &[Some(FixedLenByteArray::from(ByteArray::from(
3771 f16::NEG_ZERO,
3772 )))],
3773 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3774 ];
3775
3776 let column_close_result =
3778 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3779 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3780 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3781
3782 let column_close_result =
3784 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3785 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3786 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3787
3788 Ok(())
3789 }
3790
3791 #[test]
3792 fn test_interval_stats_should_not_have_min_max() {
3793 let input = [
3794 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3795 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3796 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3797 ]
3798 .into_iter()
3799 .map(|s| ByteArray::from(s).into())
3800 .collect::<Vec<_>>();
3801
3802 let page_writer = get_test_page_writer();
3803 let mut writer = get_test_interval_column_writer(page_writer);
3804 writer.write_batch(&input, None, None).unwrap();
3805
3806 let metadata = writer.close().unwrap().metadata;
3807 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3808 stats.clone()
3809 } else {
3810 panic!("metadata missing statistics");
3811 };
3812 assert!(stats.min_bytes_opt().is_none());
3813 assert!(stats.max_bytes_opt().is_none());
3814 }
3815
3816 #[test]
3817 #[cfg(feature = "arrow")]
3818 fn test_column_writer_get_estimated_total_bytes() {
3819 let page_writer = get_test_page_writer();
3820 let props = Default::default();
3821 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3822 assert_eq!(writer.get_estimated_total_bytes(), 0);
3823
3824 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3825 writer.add_data_page().unwrap();
3826 let size_with_one_page = writer.get_estimated_total_bytes();
3827 assert_eq!(size_with_one_page, 20);
3828
3829 writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3830 writer.add_data_page().unwrap();
3831 let size_with_two_pages = writer.get_estimated_total_bytes();
3832 assert_eq!(size_with_two_pages, 20 + 21);
3834 }
3835
3836 fn write_multiple_pages<T: DataType>(
3837 column_descr: &Arc<ColumnDescriptor>,
3838 pages: &[&[Option<T::T>]],
3839 ) -> Result<ColumnCloseResult> {
3840 let column_writer = get_column_writer(
3841 column_descr.clone(),
3842 Default::default(),
3843 get_test_page_writer(),
3844 );
3845 let mut writer = get_typed_column_writer::<T>(column_writer);
3846
3847 for &page in pages {
3848 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3849 let def_levels = page
3850 .iter()
3851 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3852 .collect::<Vec<_>>();
3853 writer.write_batch(&values, Some(&def_levels), None)?;
3854 writer.flush_data_pages()?;
3855 }
3856
3857 writer.close()
3858 }
3859
3860 fn column_roundtrip_random<T: DataType>(
3864 props: WriterProperties,
3865 max_size: usize,
3866 min_value: T::T,
3867 max_value: T::T,
3868 max_def_level: i16,
3869 max_rep_level: i16,
3870 ) where
3871 T::T: PartialOrd + SampleUniform + Copy,
3872 {
3873 let mut num_values: usize = 0;
3874
3875 let mut buf: Vec<i16> = Vec::new();
3876 let def_levels = if max_def_level > 0 {
3877 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3878 for &dl in &buf[..] {
3879 if dl == max_def_level {
3880 num_values += 1;
3881 }
3882 }
3883 Some(&buf[..])
3884 } else {
3885 num_values = max_size;
3886 None
3887 };
3888
3889 let mut buf: Vec<i16> = Vec::new();
3890 let rep_levels = if max_rep_level > 0 {
3891 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3892 buf[0] = 0; Some(&buf[..])
3894 } else {
3895 None
3896 };
3897
3898 let mut values: Vec<T::T> = Vec::new();
3899 random_numbers_range(num_values, min_value, max_value, &mut values);
3900
3901 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3902 }
3903
3904 fn column_roundtrip<T: DataType>(
3906 props: WriterProperties,
3907 values: &[T::T],
3908 def_levels: Option<&[i16]>,
3909 rep_levels: Option<&[i16]>,
3910 ) {
3911 let mut file = tempfile::tempfile().unwrap();
3912 let mut write = TrackedWrite::new(&mut file);
3913 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3914
3915 let max_def_level = match def_levels {
3916 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3917 None => 0i16,
3918 };
3919
3920 let max_rep_level = match rep_levels {
3921 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3922 None => 0i16,
3923 };
3924
3925 let mut max_batch_size = values.len();
3926 if let Some(levels) = def_levels {
3927 max_batch_size = max_batch_size.max(levels.len());
3928 }
3929 if let Some(levels) = rep_levels {
3930 max_batch_size = max_batch_size.max(levels.len());
3931 }
3932
3933 let mut writer =
3934 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3935
3936 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3937 assert_eq!(values_written, values.len());
3938 let result = writer.close().unwrap();
3939
3940 drop(write);
3941
3942 let props = ReaderProperties::builder()
3943 .set_backward_compatible_lz4(false)
3944 .build();
3945 let page_reader = Box::new(
3946 SerializedPageReader::new_with_properties(
3947 Arc::new(file),
3948 &result.metadata,
3949 result.rows_written as usize,
3950 None,
3951 Arc::new(props),
3952 )
3953 .unwrap(),
3954 );
3955 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3956
3957 let mut actual_values = Vec::with_capacity(max_batch_size);
3958 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3959 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3960
3961 let (_, values_read, levels_read) = reader
3962 .read_records(
3963 max_batch_size,
3964 actual_def_levels.as_mut(),
3965 actual_rep_levels.as_mut(),
3966 &mut actual_values,
3967 )
3968 .unwrap();
3969
3970 assert_eq!(&actual_values[..values_read], values);
3973 match actual_def_levels {
3974 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3975 None => assert_eq!(None, def_levels),
3976 }
3977 match actual_rep_levels {
3978 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3979 None => assert_eq!(None, rep_levels),
3980 }
3981
3982 if let Some(levels) = actual_rep_levels {
3985 let mut actual_rows_written = 0;
3986 for l in levels {
3987 if l == 0 {
3988 actual_rows_written += 1;
3989 }
3990 }
3991 assert_eq!(actual_rows_written, result.rows_written);
3992 } else if actual_def_levels.is_some() {
3993 assert_eq!(levels_read as u64, result.rows_written);
3994 } else {
3995 assert_eq!(values_read as u64, result.rows_written);
3996 }
3997 }
3998
3999 fn column_write_and_get_metadata<T: DataType>(
4002 props: WriterProperties,
4003 values: &[T::T],
4004 ) -> ColumnChunkMetaData {
4005 let page_writer = get_test_page_writer();
4006 let props = Arc::new(props);
4007 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4008 writer.write_batch(values, None, None).unwrap();
4009 writer.close().unwrap().metadata
4010 }
4011
4012 fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
4014 PageEncodingStats {
4015 page_type,
4016 encoding,
4017 count,
4018 }
4019 }
4020
4021 fn check_encoding_write_support<T: DataType>(
4025 version: WriterVersion,
4026 dict_enabled: bool,
4027 data: &[T::T],
4028 dictionary_page_offset: Option<i64>,
4029 encodings: &[Encoding],
4030 page_encoding_stats: &[PageEncodingStats],
4031 ) {
4032 let props = WriterProperties::builder()
4033 .set_writer_version(version)
4034 .set_dictionary_enabled(dict_enabled)
4035 .build();
4036 let meta = column_write_and_get_metadata::<T>(props, data);
4037 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
4038 assert_eq!(meta.encodings(), encodings);
4039 assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
4040 }
4041
4042 fn get_test_column_writer<'a, T: DataType>(
4044 page_writer: Box<dyn PageWriter + 'a>,
4045 max_def_level: i16,
4046 max_rep_level: i16,
4047 props: WriterPropertiesPtr,
4048 ) -> ColumnWriterImpl<'a, T> {
4049 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4050 let column_writer = get_column_writer(descr, props, page_writer);
4051 get_typed_column_writer::<T>(column_writer)
4052 }
4053
4054 fn get_test_column_reader<T: DataType>(
4056 page_reader: Box<dyn PageReader>,
4057 max_def_level: i16,
4058 max_rep_level: i16,
4059 ) -> ColumnReaderImpl<T> {
4060 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4061 let column_reader = get_column_reader(descr, page_reader);
4062 get_typed_column_reader::<T>(column_reader)
4063 }
4064
4065 fn get_test_column_descr<T: DataType>(
4067 max_def_level: i16,
4068 max_rep_level: i16,
4069 ) -> ColumnDescriptor {
4070 let path = ColumnPath::from("col");
4071 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4072 .with_length(1)
4075 .build()
4076 .unwrap();
4077 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4078 }
4079
4080 fn get_test_page_writer() -> Box<dyn PageWriter> {
4082 Box::new(TestPageWriter {})
4083 }
4084
4085 struct TestPageWriter {}
4086
4087 impl PageWriter for TestPageWriter {
4088 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4089 let mut res = PageWriteSpec::new();
4090 res.page_type = page.page_type();
4091 res.uncompressed_size = page.uncompressed_size();
4092 res.compressed_size = page.compressed_size();
4093 res.num_values = page.num_values();
4094 res.offset = 0;
4095 res.bytes_written = page.data().len() as u64;
4096 Ok(res)
4097 }
4098
4099 fn close(&mut self) -> Result<()> {
4100 Ok(())
4101 }
4102 }
4103
4104 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4106 let page_writer = get_test_page_writer();
4107 let props = Default::default();
4108 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4109 writer.write_batch(values, None, None).unwrap();
4110
4111 let metadata = writer.close().unwrap().metadata;
4112 if let Some(stats) = metadata.statistics() {
4113 stats.clone()
4114 } else {
4115 panic!("metadata missing statistics");
4116 }
4117 }
4118
4119 fn get_test_decimals_column_writer<T: DataType>(
4121 page_writer: Box<dyn PageWriter>,
4122 max_def_level: i16,
4123 max_rep_level: i16,
4124 props: WriterPropertiesPtr,
4125 ) -> ColumnWriterImpl<'static, T> {
4126 let descr = Arc::new(get_test_decimals_column_descr::<T>(
4127 max_def_level,
4128 max_rep_level,
4129 ));
4130 let column_writer = get_column_writer(descr, props, page_writer);
4131 get_typed_column_writer::<T>(column_writer)
4132 }
4133
4134 fn get_test_decimals_column_descr<T: DataType>(
4136 max_def_level: i16,
4137 max_rep_level: i16,
4138 ) -> ColumnDescriptor {
4139 let path = ColumnPath::from("col");
4140 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4141 .with_length(16)
4142 .with_logical_type(Some(LogicalType::Decimal {
4143 scale: 2,
4144 precision: 3,
4145 }))
4146 .with_scale(2)
4147 .with_precision(3)
4148 .build()
4149 .unwrap();
4150 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4151 }
4152
4153 fn float16_statistics_roundtrip(
4154 values: &[FixedLenByteArray],
4155 ) -> ValueStatistics<FixedLenByteArray> {
4156 let page_writer = get_test_page_writer();
4157 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4158 writer.write_batch(values, None, None).unwrap();
4159
4160 let metadata = writer.close().unwrap().metadata;
4161 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4162 stats.clone()
4163 } else {
4164 panic!("metadata missing statistics");
4165 }
4166 }
4167
4168 fn get_test_float16_column_writer(
4169 page_writer: Box<dyn PageWriter>,
4170 props: WriterPropertiesPtr,
4171 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4172 let descr = Arc::new(get_test_float16_column_descr(0, 0));
4173 let column_writer = get_column_writer(descr, props, page_writer);
4174 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4175 }
4176
4177 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4178 let path = ColumnPath::from("col");
4179 let tpe =
4180 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4181 .with_length(2)
4182 .with_logical_type(Some(LogicalType::Float16))
4183 .build()
4184 .unwrap();
4185 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4186 }
4187
4188 fn get_test_interval_column_writer(
4189 page_writer: Box<dyn PageWriter>,
4190 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4191 let descr = Arc::new(get_test_interval_column_descr());
4192 let column_writer = get_column_writer(descr, Default::default(), page_writer);
4193 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4194 }
4195
4196 fn get_test_interval_column_descr() -> ColumnDescriptor {
4197 let path = ColumnPath::from("col");
4198 let tpe =
4199 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4200 .with_length(12)
4201 .with_converted_type(ConvertedType::INTERVAL)
4202 .build()
4203 .unwrap();
4204 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4205 }
4206
4207 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4209 page_writer: Box<dyn PageWriter + 'a>,
4210 max_def_level: i16,
4211 max_rep_level: i16,
4212 props: WriterPropertiesPtr,
4213 ) -> ColumnWriterImpl<'a, T> {
4214 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4215 max_def_level,
4216 max_rep_level,
4217 ));
4218 let column_writer = get_column_writer(descr, props, page_writer);
4219 get_typed_column_writer::<T>(column_writer)
4220 }
4221
4222 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4224 max_def_level: i16,
4225 max_rep_level: i16,
4226 ) -> ColumnDescriptor {
4227 let path = ColumnPath::from("col");
4228 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4229 .with_converted_type(ConvertedType::UINT_32)
4230 .build()
4231 .unwrap();
4232 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4233 }
4234}