1use bytes::Bytes;
21use half::f16;
22
23use crate::bloom_filter::Sbbf;
24use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
25use std::collections::{BTreeSet, VecDeque};
26use std::str;
27
28use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type};
29use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter};
30use crate::column::writer::encoder::{ColumnValueEncoder, ColumnValueEncoderImpl, ColumnValues};
31use crate::compression::{create_codec, Codec, CodecOptionsBuilder};
32use crate::data_type::private::ParquetValueType;
33use crate::data_type::*;
34use crate::encodings::levels::LevelEncoder;
35#[cfg(feature = "encryption")]
36use crate::encryption::encrypt::get_column_crypto_metadata;
37use crate::errors::{ParquetError, Result};
38use crate::file::metadata::{
39 ColumnChunkMetaData, ColumnChunkMetaDataBuilder, ColumnIndexBuilder, LevelHistogram,
40 OffsetIndexBuilder,
41};
42use crate::file::page_encoding_stats::PageEncodingStats;
43use crate::file::properties::{
44 EnabledStatistics, WriterProperties, WriterPropertiesPtr, WriterVersion,
45};
46use crate::file::statistics::{Statistics, ValueStatistics};
47use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
48
49pub(crate) mod encoder;
50
51macro_rules! downcast_writer {
52 ($e:expr, $i:ident, $b:expr) => {
53 match $e {
54 Self::BoolColumnWriter($i) => $b,
55 Self::Int32ColumnWriter($i) => $b,
56 Self::Int64ColumnWriter($i) => $b,
57 Self::Int96ColumnWriter($i) => $b,
58 Self::FloatColumnWriter($i) => $b,
59 Self::DoubleColumnWriter($i) => $b,
60 Self::ByteArrayColumnWriter($i) => $b,
61 Self::FixedLenByteArrayColumnWriter($i) => $b,
62 }
63 };
64}
65
66pub enum ColumnWriter<'a> {
68 BoolColumnWriter(ColumnWriterImpl<'a, BoolType>),
70 Int32ColumnWriter(ColumnWriterImpl<'a, Int32Type>),
72 Int64ColumnWriter(ColumnWriterImpl<'a, Int64Type>),
74 Int96ColumnWriter(ColumnWriterImpl<'a, Int96Type>),
76 FloatColumnWriter(ColumnWriterImpl<'a, FloatType>),
78 DoubleColumnWriter(ColumnWriterImpl<'a, DoubleType>),
80 ByteArrayColumnWriter(ColumnWriterImpl<'a, ByteArrayType>),
82 FixedLenByteArrayColumnWriter(ColumnWriterImpl<'a, FixedLenByteArrayType>),
84}
85
86impl ColumnWriter<'_> {
87 #[cfg(feature = "arrow")]
89 pub(crate) fn memory_size(&self) -> usize {
90 downcast_writer!(self, typed, typed.memory_size())
91 }
92
93 #[cfg(feature = "arrow")]
95 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
96 downcast_writer!(self, typed, typed.get_estimated_total_bytes())
97 }
98
99 pub fn close(self) -> Result<ColumnCloseResult> {
101 downcast_writer!(self, typed, typed.close())
102 }
103}
104
105#[deprecated(
106 since = "54.0.0",
107 note = "Seems like a stray and nobody knows what's it for. Will be removed in the next release."
108)]
109#[allow(missing_docs)]
110pub enum Level {
111 Page,
112 Column,
113}
114
115pub fn get_column_writer<'a>(
117 descr: ColumnDescPtr,
118 props: WriterPropertiesPtr,
119 page_writer: Box<dyn PageWriter + 'a>,
120) -> ColumnWriter<'a> {
121 match descr.physical_type() {
122 Type::BOOLEAN => {
123 ColumnWriter::BoolColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
124 }
125 Type::INT32 => {
126 ColumnWriter::Int32ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
127 }
128 Type::INT64 => {
129 ColumnWriter::Int64ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
130 }
131 Type::INT96 => {
132 ColumnWriter::Int96ColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
133 }
134 Type::FLOAT => {
135 ColumnWriter::FloatColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
136 }
137 Type::DOUBLE => {
138 ColumnWriter::DoubleColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
139 }
140 Type::BYTE_ARRAY => {
141 ColumnWriter::ByteArrayColumnWriter(ColumnWriterImpl::new(descr, props, page_writer))
142 }
143 Type::FIXED_LEN_BYTE_ARRAY => ColumnWriter::FixedLenByteArrayColumnWriter(
144 ColumnWriterImpl::new(descr, props, page_writer),
145 ),
146 }
147}
148
149pub fn get_typed_column_writer<T: DataType>(col_writer: ColumnWriter) -> ColumnWriterImpl<T> {
154 T::get_column_writer(col_writer).unwrap_or_else(|| {
155 panic!(
156 "Failed to convert column writer into a typed column writer for `{}` type",
157 T::get_physical_type()
158 )
159 })
160}
161
162pub fn get_typed_column_writer_ref<'a, 'b: 'a, T: DataType>(
164 col_writer: &'b ColumnWriter<'a>,
165) -> &'b ColumnWriterImpl<'a, T> {
166 T::get_column_writer_ref(col_writer).unwrap_or_else(|| {
167 panic!(
168 "Failed to convert column writer into a typed column writer for `{}` type",
169 T::get_physical_type()
170 )
171 })
172}
173
174pub fn get_typed_column_writer_mut<'a, 'b: 'a, T: DataType>(
176 col_writer: &'a mut ColumnWriter<'b>,
177) -> &'a mut ColumnWriterImpl<'b, T> {
178 T::get_column_writer_mut(col_writer).unwrap_or_else(|| {
179 panic!(
180 "Failed to convert column writer into a typed column writer for `{}` type",
181 T::get_physical_type()
182 )
183 })
184}
185
186#[derive(Debug, Clone)]
188pub struct ColumnCloseResult {
189 pub bytes_written: u64,
191 pub rows_written: u64,
193 pub metadata: ColumnChunkMetaData,
195 pub bloom_filter: Option<Sbbf>,
197 pub column_index: Option<ColumnIndex>,
199 pub offset_index: Option<OffsetIndex>,
201}
202
203#[derive(Default)]
205struct PageMetrics {
206 num_buffered_values: u32,
207 num_buffered_rows: u32,
208 num_page_nulls: u64,
209 repetition_level_histogram: Option<LevelHistogram>,
210 definition_level_histogram: Option<LevelHistogram>,
211}
212
213impl PageMetrics {
214 fn new() -> Self {
215 Default::default()
216 }
217
218 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
220 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
221 self
222 }
223
224 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
226 self.definition_level_histogram = LevelHistogram::try_new(max_level);
227 self
228 }
229
230 fn new_page(&mut self) {
233 self.num_buffered_values = 0;
234 self.num_buffered_rows = 0;
235 self.num_page_nulls = 0;
236 self.repetition_level_histogram
237 .as_mut()
238 .map(LevelHistogram::reset);
239 self.definition_level_histogram
240 .as_mut()
241 .map(LevelHistogram::reset);
242 }
243
244 fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
246 if let Some(ref mut rep_hist) = self.repetition_level_histogram {
247 rep_hist.update_from_levels(levels);
248 }
249 }
250
251 fn update_definition_level_histogram(&mut self, levels: &[i16]) {
253 if let Some(ref mut def_hist) = self.definition_level_histogram {
254 def_hist.update_from_levels(levels);
255 }
256 }
257}
258
259#[derive(Default)]
261struct ColumnMetrics<T: Default> {
262 total_bytes_written: u64,
263 total_rows_written: u64,
264 total_uncompressed_size: u64,
265 total_compressed_size: u64,
266 total_num_values: u64,
267 dictionary_page_offset: Option<u64>,
268 data_page_offset: Option<u64>,
269 min_column_value: Option<T>,
270 max_column_value: Option<T>,
271 num_column_nulls: u64,
272 column_distinct_count: Option<u64>,
273 variable_length_bytes: Option<i64>,
274 repetition_level_histogram: Option<LevelHistogram>,
275 definition_level_histogram: Option<LevelHistogram>,
276}
277
278impl<T: Default> ColumnMetrics<T> {
279 fn new() -> Self {
280 Default::default()
281 }
282
283 fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
285 self.repetition_level_histogram = LevelHistogram::try_new(max_level);
286 self
287 }
288
289 fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
291 self.definition_level_histogram = LevelHistogram::try_new(max_level);
292 self
293 }
294
295 fn update_histogram(
297 chunk_histogram: &mut Option<LevelHistogram>,
298 page_histogram: &Option<LevelHistogram>,
299 ) {
300 if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
301 chunk_hist.add(page_hist);
302 }
303 }
304
305 fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
308 ColumnMetrics::<T>::update_histogram(
309 &mut self.definition_level_histogram,
310 &page_metrics.definition_level_histogram,
311 );
312 ColumnMetrics::<T>::update_histogram(
313 &mut self.repetition_level_histogram,
314 &page_metrics.repetition_level_histogram,
315 );
316 }
317
318 fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
320 if let Some(var_bytes) = variable_length_bytes {
321 *self.variable_length_bytes.get_or_insert(0) += var_bytes;
322 }
323 }
324}
325
326pub type ColumnWriterImpl<'a, T> = GenericColumnWriter<'a, ColumnValueEncoderImpl<T>>;
328
329pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
331 descr: ColumnDescPtr,
333 props: WriterPropertiesPtr,
334 statistics_enabled: EnabledStatistics,
335
336 page_writer: Box<dyn PageWriter + 'a>,
337 codec: Compression,
338 compressor: Option<Box<dyn Codec>>,
339 encoder: E,
340
341 page_metrics: PageMetrics,
342 column_metrics: ColumnMetrics<E::T>,
344
345 encodings: BTreeSet<Encoding>,
348 encoding_stats: Vec<PageEncodingStats>,
349 def_levels_sink: Vec<i16>,
351 rep_levels_sink: Vec<i16>,
352 data_pages: VecDeque<CompressedPage>,
353 column_index_builder: ColumnIndexBuilder,
355 offset_index_builder: Option<OffsetIndexBuilder>,
356
357 data_page_boundary_ascending: bool,
360 data_page_boundary_descending: bool,
361 last_non_null_data_page_min_max: Option<(E::T, E::T)>,
363}
364
365impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
366 pub fn new(
368 descr: ColumnDescPtr,
369 props: WriterPropertiesPtr,
370 page_writer: Box<dyn PageWriter + 'a>,
371 ) -> Self {
372 let codec = props.compression(descr.path());
373 let codec_options = CodecOptionsBuilder::default().build();
374 let compressor = create_codec(codec, &codec_options).unwrap();
375 let encoder = E::try_new(&descr, props.as_ref()).unwrap();
376
377 let statistics_enabled = props.statistics_enabled(descr.path());
378
379 let mut encodings = BTreeSet::new();
380 encodings.insert(Encoding::RLE);
382
383 let mut page_metrics = PageMetrics::new();
384 let mut column_metrics = ColumnMetrics::<E::T>::new();
385
386 if statistics_enabled != EnabledStatistics::None {
388 page_metrics = page_metrics
389 .with_repetition_level_histogram(descr.max_rep_level())
390 .with_definition_level_histogram(descr.max_def_level());
391 column_metrics = column_metrics
392 .with_repetition_level_histogram(descr.max_rep_level())
393 .with_definition_level_histogram(descr.max_def_level())
394 }
395
396 let mut column_index_builder = ColumnIndexBuilder::new();
398 if statistics_enabled != EnabledStatistics::Page {
399 column_index_builder.to_invalid()
400 }
401
402 let offset_index_builder = match props.offset_index_disabled() {
404 false => Some(OffsetIndexBuilder::new()),
405 _ => None,
406 };
407
408 Self {
409 descr,
410 props,
411 statistics_enabled,
412 page_writer,
413 codec,
414 compressor,
415 encoder,
416 def_levels_sink: vec![],
417 rep_levels_sink: vec![],
418 data_pages: VecDeque::new(),
419 page_metrics,
420 column_metrics,
421 column_index_builder,
422 offset_index_builder,
423 encodings,
424 encoding_stats: vec![],
425 data_page_boundary_ascending: true,
426 data_page_boundary_descending: true,
427 last_non_null_data_page_min_max: None,
428 }
429 }
430
431 #[allow(clippy::too_many_arguments)]
432 pub(crate) fn write_batch_internal(
433 &mut self,
434 values: &E::Values,
435 value_indices: Option<&[usize]>,
436 def_levels: Option<&[i16]>,
437 rep_levels: Option<&[i16]>,
438 min: Option<&E::T>,
439 max: Option<&E::T>,
440 distinct_count: Option<u64>,
441 ) -> Result<usize> {
442 if let (Some(def), Some(rep)) = (def_levels, rep_levels) {
444 if def.len() != rep.len() {
445 return Err(general_err!(
446 "Inconsistent length of definition and repetition levels: {} != {}",
447 def.len(),
448 rep.len()
449 ));
450 }
451 }
452
453 let num_levels = match def_levels {
464 Some(def_levels) => def_levels.len(),
465 None => values.len(),
466 };
467
468 if let Some(min) = min {
469 update_min(&self.descr, min, &mut self.column_metrics.min_column_value);
470 }
471 if let Some(max) = max {
472 update_max(&self.descr, max, &mut self.column_metrics.max_column_value);
473 }
474
475 if self.encoder.num_values() == 0 {
477 self.column_metrics.column_distinct_count = distinct_count;
478 } else {
479 self.column_metrics.column_distinct_count = None;
480 }
481
482 let mut values_offset = 0;
483 let mut levels_offset = 0;
484 let base_batch_size = self.props.write_batch_size();
485 while levels_offset < num_levels {
486 let mut end_offset = num_levels.min(levels_offset + base_batch_size);
487
488 if let Some(r) = rep_levels {
490 while end_offset < r.len() && r[end_offset] != 0 {
491 end_offset += 1;
492 }
493 }
494
495 values_offset += self.write_mini_batch(
496 values,
497 values_offset,
498 value_indices,
499 end_offset - levels_offset,
500 def_levels.map(|lv| &lv[levels_offset..end_offset]),
501 rep_levels.map(|lv| &lv[levels_offset..end_offset]),
502 )?;
503 levels_offset = end_offset;
504 }
505
506 Ok(values_offset)
508 }
509
510 pub fn write_batch(
523 &mut self,
524 values: &E::Values,
525 def_levels: Option<&[i16]>,
526 rep_levels: Option<&[i16]>,
527 ) -> Result<usize> {
528 self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
529 }
530
531 pub fn write_batch_with_statistics(
539 &mut self,
540 values: &E::Values,
541 def_levels: Option<&[i16]>,
542 rep_levels: Option<&[i16]>,
543 min: Option<&E::T>,
544 max: Option<&E::T>,
545 distinct_count: Option<u64>,
546 ) -> Result<usize> {
547 self.write_batch_internal(
548 values,
549 None,
550 def_levels,
551 rep_levels,
552 min,
553 max,
554 distinct_count,
555 )
556 }
557
558 #[cfg(feature = "arrow")]
563 pub(crate) fn memory_size(&self) -> usize {
564 self.column_metrics.total_bytes_written as usize + self.encoder.estimated_memory_size()
565 }
566
567 pub fn get_total_bytes_written(&self) -> u64 {
573 self.column_metrics.total_bytes_written
574 }
575
576 #[cfg(feature = "arrow")]
582 pub(crate) fn get_estimated_total_bytes(&self) -> u64 {
583 self.data_pages
584 .iter()
585 .map(|page| page.data().len() as u64)
586 .sum::<u64>()
587 + self.column_metrics.total_bytes_written
588 + self.encoder.estimated_data_page_size() as u64
589 + self.encoder.estimated_dict_page_size().unwrap_or_default() as u64
590 }
591
592 pub fn get_total_rows_written(&self) -> u64 {
595 self.column_metrics.total_rows_written
596 }
597
598 pub fn get_descriptor(&self) -> &ColumnDescPtr {
600 &self.descr
601 }
602
603 pub fn close(mut self) -> Result<ColumnCloseResult> {
606 if self.page_metrics.num_buffered_values > 0 {
607 self.add_data_page()?;
608 }
609 if self.encoder.has_dictionary() {
610 self.write_dictionary_page()?;
611 }
612 self.flush_data_pages()?;
613 let metadata = self.build_column_metadata()?;
614 self.page_writer.close()?;
615
616 let boundary_order = match (
617 self.data_page_boundary_ascending,
618 self.data_page_boundary_descending,
619 ) {
620 (true, _) => BoundaryOrder::ASCENDING,
623 (false, true) => BoundaryOrder::DESCENDING,
624 (false, false) => BoundaryOrder::UNORDERED,
625 };
626 self.column_index_builder.set_boundary_order(boundary_order);
627
628 let column_index = self
629 .column_index_builder
630 .valid()
631 .then(|| self.column_index_builder.build_to_thrift());
632
633 let offset_index = self.offset_index_builder.map(|b| b.build_to_thrift());
634
635 Ok(ColumnCloseResult {
636 bytes_written: self.column_metrics.total_bytes_written,
637 rows_written: self.column_metrics.total_rows_written,
638 bloom_filter: self.encoder.flush_bloom_filter(),
639 metadata,
640 column_index,
641 offset_index,
642 })
643 }
644
645 fn write_mini_batch(
649 &mut self,
650 values: &E::Values,
651 values_offset: usize,
652 value_indices: Option<&[usize]>,
653 num_levels: usize,
654 def_levels: Option<&[i16]>,
655 rep_levels: Option<&[i16]>,
656 ) -> Result<usize> {
657 let values_to_write = if self.descr.max_def_level() > 0 {
659 let levels = def_levels.ok_or_else(|| {
660 general_err!(
661 "Definition levels are required, because max definition level = {}",
662 self.descr.max_def_level()
663 )
664 })?;
665
666 let mut values_to_write = 0;
667 for &level in levels {
668 if level == self.descr.max_def_level() {
669 values_to_write += 1;
670 } else {
671 self.page_metrics.num_page_nulls += 1
673 }
674 }
675
676 self.page_metrics.update_definition_level_histogram(levels);
678
679 self.def_levels_sink.extend_from_slice(levels);
680 values_to_write
681 } else {
682 num_levels
683 };
684
685 if self.descr.max_rep_level() > 0 {
687 let levels = rep_levels.ok_or_else(|| {
689 general_err!(
690 "Repetition levels are required, because max repetition level = {}",
691 self.descr.max_rep_level()
692 )
693 })?;
694
695 if !levels.is_empty() && levels[0] != 0 {
696 return Err(general_err!(
697 "Write must start at a record boundary, got non-zero repetition level of {}",
698 levels[0]
699 ));
700 }
701
702 for &level in levels {
704 self.page_metrics.num_buffered_rows += (level == 0) as u32
705 }
706
707 self.page_metrics.update_repetition_level_histogram(levels);
709
710 self.rep_levels_sink.extend_from_slice(levels);
711 } else {
712 self.page_metrics.num_buffered_rows += num_levels as u32;
715 }
716
717 match value_indices {
718 Some(indices) => {
719 let indices = &indices[values_offset..values_offset + values_to_write];
720 self.encoder.write_gather(values, indices)?;
721 }
722 None => self.encoder.write(values, values_offset, values_to_write)?,
723 }
724
725 self.page_metrics.num_buffered_values += num_levels as u32;
726
727 if self.should_add_data_page() {
728 self.add_data_page()?;
729 }
730
731 if self.should_dict_fallback() {
732 self.dict_fallback()?;
733 }
734
735 Ok(values_to_write)
736 }
737
738 #[inline]
743 fn should_dict_fallback(&self) -> bool {
744 match self.encoder.estimated_dict_page_size() {
745 Some(size) => size >= self.props.dictionary_page_size_limit(),
746 None => false,
747 }
748 }
749
750 #[inline]
752 fn should_add_data_page(&self) -> bool {
753 if self.page_metrics.num_buffered_values == 0 {
758 return false;
759 }
760
761 self.page_metrics.num_buffered_rows as usize >= self.props.data_page_row_count_limit()
762 || self.encoder.estimated_data_page_size() >= self.props.data_page_size_limit()
763 }
764
765 fn dict_fallback(&mut self) -> Result<()> {
768 if self.page_metrics.num_buffered_values > 0 {
770 self.add_data_page()?;
771 }
772 self.write_dictionary_page()?;
773 self.flush_data_pages()?;
774 Ok(())
775 }
776
777 fn update_column_offset_index(
779 &mut self,
780 page_statistics: Option<&ValueStatistics<E::T>>,
781 page_variable_length_bytes: Option<i64>,
782 ) {
783 let null_page =
785 (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
786 if null_page && self.column_index_builder.valid() {
789 self.column_index_builder.append(
790 null_page,
791 vec![],
792 vec![],
793 self.page_metrics.num_page_nulls as i64,
794 );
795 } else if self.column_index_builder.valid() {
796 match &page_statistics {
799 None => {
800 self.column_index_builder.to_invalid();
801 }
802 Some(stat) => {
803 let new_min = stat.min_opt().unwrap();
805 let new_max = stat.max_opt().unwrap();
806 if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
807 if self.data_page_boundary_ascending {
808 let not_ascending = compare_greater(&self.descr, last_min, new_min)
810 || compare_greater(&self.descr, last_max, new_max);
811 if not_ascending {
812 self.data_page_boundary_ascending = false;
813 }
814 }
815
816 if self.data_page_boundary_descending {
817 let not_descending = compare_greater(&self.descr, new_min, last_min)
819 || compare_greater(&self.descr, new_max, last_max);
820 if not_descending {
821 self.data_page_boundary_descending = false;
822 }
823 }
824 }
825 self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));
826
827 if self.can_truncate_value() {
828 self.column_index_builder.append(
829 null_page,
830 self.truncate_min_value(
831 self.props.column_index_truncate_length(),
832 stat.min_bytes_opt().unwrap(),
833 )
834 .0,
835 self.truncate_max_value(
836 self.props.column_index_truncate_length(),
837 stat.max_bytes_opt().unwrap(),
838 )
839 .0,
840 self.page_metrics.num_page_nulls as i64,
841 );
842 } else {
843 self.column_index_builder.append(
844 null_page,
845 stat.min_bytes_opt().unwrap().to_vec(),
846 stat.max_bytes_opt().unwrap().to_vec(),
847 self.page_metrics.num_page_nulls as i64,
848 );
849 }
850 }
851 }
852 }
853
854 self.column_index_builder.append_histograms(
856 &self.page_metrics.repetition_level_histogram,
857 &self.page_metrics.definition_level_histogram,
858 );
859
860 if let Some(builder) = self.offset_index_builder.as_mut() {
862 builder.append_row_count(self.page_metrics.num_buffered_rows as i64);
863 builder.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
864 }
865 }
866
867 fn can_truncate_value(&self) -> bool {
869 match self.descr.physical_type() {
870 Type::FIXED_LEN_BYTE_ARRAY
874 if !matches!(
875 self.descr.logical_type(),
876 Some(LogicalType::Decimal { .. }) | Some(LogicalType::Float16)
877 ) =>
878 {
879 true
880 }
881 Type::BYTE_ARRAY => true,
882 _ => false,
884 }
885 }
886
887 fn is_utf8(&self) -> bool {
889 self.get_descriptor().logical_type() == Some(LogicalType::String)
890 || self.get_descriptor().converted_type() == ConvertedType::UTF8
891 }
892
893 fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
904 truncation_length
905 .filter(|l| data.len() > *l)
906 .and_then(|l|
907 if self.is_utf8() {
909 match str::from_utf8(data) {
910 Ok(str_data) => truncate_utf8(str_data, l),
911 Err(_) => Some(data[..l].to_vec()),
912 }
913 } else {
914 Some(data[..l].to_vec())
915 }
916 )
917 .map(|truncated| (truncated, true))
918 .unwrap_or_else(|| (data.to_vec(), false))
919 }
920
921 fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
935 truncation_length
936 .filter(|l| data.len() > *l)
937 .and_then(|l|
938 if self.is_utf8() {
940 match str::from_utf8(data) {
941 Ok(str_data) => truncate_and_increment_utf8(str_data, l),
942 Err(_) => increment(data[..l].to_vec()),
943 }
944 } else {
945 increment(data[..l].to_vec())
946 }
947 )
948 .map(|truncated| (truncated, true))
949 .unwrap_or_else(|| (data.to_vec(), false))
950 }
951
952 fn truncate_statistics(&self, statistics: Statistics) -> Statistics {
955 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
956 match statistics {
957 Statistics::ByteArray(stats) if stats._internal_has_min_max_set() => {
958 let (min, did_truncate_min) = self.truncate_min_value(
959 self.props.statistics_truncate_length(),
960 stats.min_bytes_opt().unwrap(),
961 );
962 let (max, did_truncate_max) = self.truncate_max_value(
963 self.props.statistics_truncate_length(),
964 stats.max_bytes_opt().unwrap(),
965 );
966 Statistics::ByteArray(
967 ValueStatistics::new(
968 Some(min.into()),
969 Some(max.into()),
970 stats.distinct_count(),
971 stats.null_count_opt(),
972 backwards_compatible_min_max,
973 )
974 .with_max_is_exact(!did_truncate_max)
975 .with_min_is_exact(!did_truncate_min),
976 )
977 }
978 Statistics::FixedLenByteArray(stats)
979 if (stats._internal_has_min_max_set() && self.can_truncate_value()) =>
980 {
981 let (min, did_truncate_min) = self.truncate_min_value(
982 self.props.statistics_truncate_length(),
983 stats.min_bytes_opt().unwrap(),
984 );
985 let (max, did_truncate_max) = self.truncate_max_value(
986 self.props.statistics_truncate_length(),
987 stats.max_bytes_opt().unwrap(),
988 );
989 Statistics::FixedLenByteArray(
990 ValueStatistics::new(
991 Some(min.into()),
992 Some(max.into()),
993 stats.distinct_count(),
994 stats.null_count_opt(),
995 backwards_compatible_min_max,
996 )
997 .with_max_is_exact(!did_truncate_max)
998 .with_min_is_exact(!did_truncate_min),
999 )
1000 }
1001 stats => stats,
1002 }
1003 }
1004
1005 fn add_data_page(&mut self) -> Result<()> {
1008 let values_data = self.encoder.flush_data_page()?;
1010
1011 let max_def_level = self.descr.max_def_level();
1012 let max_rep_level = self.descr.max_rep_level();
1013
1014 self.column_metrics.num_column_nulls += self.page_metrics.num_page_nulls;
1015
1016 let page_statistics = match (values_data.min_value, values_data.max_value) {
1017 (Some(min), Some(max)) => {
1018 update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
1020 update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
1021
1022 (self.statistics_enabled == EnabledStatistics::Page).then_some(
1023 ValueStatistics::new(
1024 Some(min),
1025 Some(max),
1026 None,
1027 Some(self.page_metrics.num_page_nulls),
1028 false,
1029 ),
1030 )
1031 }
1032 _ => None,
1033 };
1034
1035 self.update_column_offset_index(
1037 page_statistics.as_ref(),
1038 values_data.variable_length_bytes,
1039 );
1040
1041 self.column_metrics
1043 .update_from_page_metrics(&self.page_metrics);
1044 self.column_metrics
1045 .update_variable_length_bytes(values_data.variable_length_bytes);
1046
1047 let page_statistics = page_statistics.map(Statistics::from);
1048 let page_statistics = page_statistics.map(|stats| self.truncate_statistics(stats));
1049
1050 let compressed_page = match self.props.writer_version() {
1051 WriterVersion::PARQUET_1_0 => {
1052 let mut buffer = vec![];
1053
1054 if max_rep_level > 0 {
1055 buffer.extend_from_slice(
1056 &self.encode_levels_v1(
1057 Encoding::RLE,
1058 &self.rep_levels_sink[..],
1059 max_rep_level,
1060 )[..],
1061 );
1062 }
1063
1064 if max_def_level > 0 {
1065 buffer.extend_from_slice(
1066 &self.encode_levels_v1(
1067 Encoding::RLE,
1068 &self.def_levels_sink[..],
1069 max_def_level,
1070 )[..],
1071 );
1072 }
1073
1074 buffer.extend_from_slice(&values_data.buf);
1075 let uncompressed_size = buffer.len();
1076
1077 if let Some(ref mut cmpr) = self.compressor {
1078 let mut compressed_buf = Vec::with_capacity(uncompressed_size);
1079 cmpr.compress(&buffer[..], &mut compressed_buf)?;
1080 buffer = compressed_buf;
1081 }
1082
1083 let data_page = Page::DataPage {
1084 buf: buffer.into(),
1085 num_values: self.page_metrics.num_buffered_values,
1086 encoding: values_data.encoding,
1087 def_level_encoding: Encoding::RLE,
1088 rep_level_encoding: Encoding::RLE,
1089 statistics: page_statistics,
1090 };
1091
1092 CompressedPage::new(data_page, uncompressed_size)
1093 }
1094 WriterVersion::PARQUET_2_0 => {
1095 let mut rep_levels_byte_len = 0;
1096 let mut def_levels_byte_len = 0;
1097 let mut buffer = vec![];
1098
1099 if max_rep_level > 0 {
1100 let levels = self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
1101 rep_levels_byte_len = levels.len();
1102 buffer.extend_from_slice(&levels[..]);
1103 }
1104
1105 if max_def_level > 0 {
1106 let levels = self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
1107 def_levels_byte_len = levels.len();
1108 buffer.extend_from_slice(&levels[..]);
1109 }
1110
1111 let uncompressed_size =
1112 rep_levels_byte_len + def_levels_byte_len + values_data.buf.len();
1113
1114 match self.compressor {
1116 Some(ref mut cmpr) => {
1117 cmpr.compress(&values_data.buf, &mut buffer)?;
1118 }
1119 None => buffer.extend_from_slice(&values_data.buf),
1120 }
1121
1122 let data_page = Page::DataPageV2 {
1123 buf: buffer.into(),
1124 num_values: self.page_metrics.num_buffered_values,
1125 encoding: values_data.encoding,
1126 num_nulls: self.page_metrics.num_page_nulls as u32,
1127 num_rows: self.page_metrics.num_buffered_rows,
1128 def_levels_byte_len: def_levels_byte_len as u32,
1129 rep_levels_byte_len: rep_levels_byte_len as u32,
1130 is_compressed: self.compressor.is_some(),
1131 statistics: page_statistics,
1132 };
1133
1134 CompressedPage::new(data_page, uncompressed_size)
1135 }
1136 };
1137
1138 if self.encoder.has_dictionary() {
1140 self.data_pages.push_back(compressed_page);
1141 } else {
1142 self.write_data_page(compressed_page)?;
1143 }
1144
1145 self.column_metrics.total_rows_written += self.page_metrics.num_buffered_rows as u64;
1147
1148 self.rep_levels_sink.clear();
1150 self.def_levels_sink.clear();
1151 self.page_metrics.new_page();
1152
1153 Ok(())
1154 }
1155
1156 #[inline]
1159 fn flush_data_pages(&mut self) -> Result<()> {
1160 if self.page_metrics.num_buffered_values > 0 {
1162 self.add_data_page()?;
1163 }
1164
1165 while let Some(page) = self.data_pages.pop_front() {
1166 self.write_data_page(page)?;
1167 }
1168
1169 Ok(())
1170 }
1171
1172 fn build_column_metadata(&mut self) -> Result<ColumnChunkMetaData> {
1174 let total_compressed_size = self.column_metrics.total_compressed_size as i64;
1175 let total_uncompressed_size = self.column_metrics.total_uncompressed_size as i64;
1176 let num_values = self.column_metrics.total_num_values as i64;
1177 let dict_page_offset = self.column_metrics.dictionary_page_offset.map(|v| v as i64);
1178 let data_page_offset = self.column_metrics.data_page_offset.unwrap_or(0) as i64;
1180
1181 let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
1182 .set_compression(self.codec)
1183 .set_encodings(self.encodings.iter().cloned().collect())
1184 .set_page_encoding_stats(self.encoding_stats.clone())
1185 .set_total_compressed_size(total_compressed_size)
1186 .set_total_uncompressed_size(total_uncompressed_size)
1187 .set_num_values(num_values)
1188 .set_data_page_offset(data_page_offset)
1189 .set_dictionary_page_offset(dict_page_offset);
1190
1191 if self.statistics_enabled != EnabledStatistics::None {
1192 let backwards_compatible_min_max = self.descr.sort_order().is_signed();
1193
1194 let statistics = ValueStatistics::<E::T>::new(
1195 self.column_metrics.min_column_value.clone(),
1196 self.column_metrics.max_column_value.clone(),
1197 self.column_metrics.column_distinct_count,
1198 Some(self.column_metrics.num_column_nulls),
1199 false,
1200 )
1201 .with_backwards_compatible_min_max(backwards_compatible_min_max)
1202 .into();
1203
1204 let statistics = self.truncate_statistics(statistics);
1205
1206 builder = builder
1207 .set_statistics(statistics)
1208 .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
1209 .set_repetition_level_histogram(
1210 self.column_metrics.repetition_level_histogram.take(),
1211 )
1212 .set_definition_level_histogram(
1213 self.column_metrics.definition_level_histogram.take(),
1214 );
1215 }
1216
1217 builder = self.set_column_chunk_encryption_properties(builder);
1218
1219 let metadata = builder.build()?;
1220 Ok(metadata)
1221 }
1222
1223 #[inline]
1225 fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level: i16) -> Vec<u8> {
1226 let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
1227 encoder.put(levels);
1228 encoder.consume()
1229 }
1230
1231 #[inline]
1234 fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
1235 let mut encoder = LevelEncoder::v2(max_level, levels.len());
1236 encoder.put(levels);
1237 encoder.consume()
1238 }
1239
1240 #[inline]
1242 fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
1243 self.encodings.insert(page.encoding());
1244 match self.encoding_stats.last_mut() {
1245 Some(encoding_stats)
1246 if encoding_stats.page_type == page.page_type()
1247 && encoding_stats.encoding == page.encoding() =>
1248 {
1249 encoding_stats.count += 1;
1250 }
1251 _ => {
1252 self.encoding_stats.push(PageEncodingStats {
1255 page_type: page.page_type(),
1256 encoding: page.encoding(),
1257 count: 1,
1258 });
1259 }
1260 }
1261 let page_spec = self.page_writer.write_page(page)?;
1262 if let Some(builder) = self.offset_index_builder.as_mut() {
1265 builder
1266 .append_offset_and_size(page_spec.offset as i64, page_spec.compressed_size as i32)
1267 }
1268 self.update_metrics_for_page(page_spec);
1269 Ok(())
1270 }
1271
1272 #[inline]
1274 fn write_dictionary_page(&mut self) -> Result<()> {
1275 let compressed_page = {
1276 let mut page = self
1277 .encoder
1278 .flush_dict_page()?
1279 .ok_or_else(|| general_err!("Dictionary encoder is not set"))?;
1280
1281 let uncompressed_size = page.buf.len();
1282
1283 if let Some(ref mut cmpr) = self.compressor {
1284 let mut output_buf = Vec::with_capacity(uncompressed_size);
1285 cmpr.compress(&page.buf, &mut output_buf)?;
1286 page.buf = Bytes::from(output_buf);
1287 }
1288
1289 let dict_page = Page::DictionaryPage {
1290 buf: page.buf,
1291 num_values: page.num_values as u32,
1292 encoding: self.props.dictionary_page_encoding(),
1293 is_sorted: page.is_sorted,
1294 };
1295 CompressedPage::new(dict_page, uncompressed_size)
1296 };
1297
1298 self.encodings.insert(compressed_page.encoding());
1299 self.encoding_stats.push(PageEncodingStats {
1300 page_type: PageType::DICTIONARY_PAGE,
1301 encoding: compressed_page.encoding(),
1302 count: 1,
1303 });
1304 let page_spec = self.page_writer.write_page(compressed_page)?;
1305 self.update_metrics_for_page(page_spec);
1306 Ok(())
1308 }
1309
1310 #[inline]
1312 fn update_metrics_for_page(&mut self, page_spec: PageWriteSpec) {
1313 self.column_metrics.total_uncompressed_size += page_spec.uncompressed_size as u64;
1314 self.column_metrics.total_compressed_size += page_spec.compressed_size as u64;
1315 self.column_metrics.total_bytes_written += page_spec.bytes_written;
1316
1317 match page_spec.page_type {
1318 PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => {
1319 self.column_metrics.total_num_values += page_spec.num_values as u64;
1320 if self.column_metrics.data_page_offset.is_none() {
1321 self.column_metrics.data_page_offset = Some(page_spec.offset);
1322 }
1323 }
1324 PageType::DICTIONARY_PAGE => {
1325 assert!(
1326 self.column_metrics.dictionary_page_offset.is_none(),
1327 "Dictionary offset is already set"
1328 );
1329 self.column_metrics.dictionary_page_offset = Some(page_spec.offset);
1330 }
1331 _ => {}
1332 }
1333 }
1334
1335 #[inline]
1336 #[cfg(feature = "encryption")]
1337 fn set_column_chunk_encryption_properties(
1338 &self,
1339 builder: ColumnChunkMetaDataBuilder,
1340 ) -> ColumnChunkMetaDataBuilder {
1341 if let Some(encryption_properties) = self.props.file_encryption_properties.as_ref() {
1342 builder.set_column_crypto_metadata(get_column_crypto_metadata(
1343 encryption_properties,
1344 &self.descr,
1345 ))
1346 } else {
1347 builder
1348 }
1349 }
1350
1351 #[inline]
1352 #[cfg(not(feature = "encryption"))]
1353 fn set_column_chunk_encryption_properties(
1354 &self,
1355 builder: ColumnChunkMetaDataBuilder,
1356 ) -> ColumnChunkMetaDataBuilder {
1357 builder
1358 }
1359}
1360
1361fn update_min<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, min: &mut Option<T>) {
1362 update_stat::<T, _>(descr, val, min, |cur| compare_greater(descr, cur, val))
1363}
1364
1365fn update_max<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T, max: &mut Option<T>) {
1366 update_stat::<T, _>(descr, val, max, |cur| compare_greater(descr, val, cur))
1367}
1368
1369#[inline]
1370#[allow(clippy::eq_op)]
1371fn is_nan<T: ParquetValueType>(descr: &ColumnDescriptor, val: &T) -> bool {
1372 match T::PHYSICAL_TYPE {
1373 Type::FLOAT | Type::DOUBLE => val != val,
1374 Type::FIXED_LEN_BYTE_ARRAY if descr.logical_type() == Some(LogicalType::Float16) => {
1375 let val = val.as_bytes();
1376 let val = f16::from_le_bytes([val[0], val[1]]);
1377 val.is_nan()
1378 }
1379 _ => false,
1380 }
1381}
1382
1383fn update_stat<T: ParquetValueType, F>(
1388 descr: &ColumnDescriptor,
1389 val: &T,
1390 cur: &mut Option<T>,
1391 should_update: F,
1392) where
1393 F: Fn(&T) -> bool,
1394{
1395 if is_nan(descr, val) {
1396 return;
1397 }
1398
1399 if cur.as_ref().map_or(true, should_update) {
1400 *cur = Some(val.clone());
1401 }
1402}
1403
1404fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
1406 if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
1407 if !is_signed {
1408 return a.as_u64().unwrap() > b.as_u64().unwrap();
1410 }
1411 }
1412
1413 match descr.converted_type() {
1414 ConvertedType::UINT_8
1415 | ConvertedType::UINT_16
1416 | ConvertedType::UINT_32
1417 | ConvertedType::UINT_64 => {
1418 return a.as_u64().unwrap() > b.as_u64().unwrap();
1419 }
1420 _ => {}
1421 };
1422
1423 if let Some(LogicalType::Decimal { .. }) = descr.logical_type() {
1424 match T::PHYSICAL_TYPE {
1425 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1426 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1427 }
1428 _ => {}
1429 };
1430 }
1431
1432 if descr.converted_type() == ConvertedType::DECIMAL {
1433 match T::PHYSICAL_TYPE {
1434 Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => {
1435 return compare_greater_byte_array_decimals(a.as_bytes(), b.as_bytes());
1436 }
1437 _ => {}
1438 };
1439 };
1440
1441 if let Some(LogicalType::Float16) = descr.logical_type() {
1442 let a = a.as_bytes();
1443 let a = f16::from_le_bytes([a[0], a[1]]);
1444 let b = b.as_bytes();
1445 let b = f16::from_le_bytes([b[0], b[1]]);
1446 return a > b;
1447 }
1448
1449 a > b
1450}
1451
1452fn fallback_encoding(kind: Type, props: &WriterProperties) -> Encoding {
1460 match (kind, props.writer_version()) {
1461 (Type::BOOLEAN, WriterVersion::PARQUET_2_0) => Encoding::RLE,
1462 (Type::INT32, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1463 (Type::INT64, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BINARY_PACKED,
1464 (Type::BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1465 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => Encoding::DELTA_BYTE_ARRAY,
1466 _ => Encoding::PLAIN,
1467 }
1468}
1469
1470fn has_dictionary_support(kind: Type, props: &WriterProperties) -> bool {
1472 match (kind, props.writer_version()) {
1473 (Type::BOOLEAN, _) => false,
1475 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_1_0) => false,
1477 (Type::FIXED_LEN_BYTE_ARRAY, WriterVersion::PARQUET_2_0) => true,
1478 _ => true,
1479 }
1480}
1481
1482fn compare_greater_byte_array_decimals(a: &[u8], b: &[u8]) -> bool {
1484 let a_length = a.len();
1485 let b_length = b.len();
1486
1487 if a_length == 0 || b_length == 0 {
1488 return a_length > 0;
1489 }
1490
1491 let first_a: u8 = a[0];
1492 let first_b: u8 = b[0];
1493
1494 if (0x80 & first_a) != (0x80 & first_b) || (a_length == b_length && first_a != first_b) {
1499 return (first_a as i8) > (first_b as i8);
1500 }
1501
1502 let extension: u8 = if (first_a as i8) < 0 { 0xFF } else { 0 };
1508
1509 if a_length != b_length {
1510 let not_equal = if a_length > b_length {
1511 let lead_length = a_length - b_length;
1512 a[0..lead_length].iter().any(|&x| x != extension)
1513 } else {
1514 let lead_length = b_length - a_length;
1515 b[0..lead_length].iter().any(|&x| x != extension)
1516 };
1517
1518 if not_equal {
1519 let negative_values: bool = (first_a as i8) < 0;
1520 let a_longer: bool = a_length > b_length;
1521 return if negative_values { !a_longer } else { a_longer };
1522 }
1523 }
1524
1525 (a[1..]) > (b[1..])
1526}
1527
1528fn truncate_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1534 let split = (1..=length).rfind(|x| data.is_char_boundary(*x))?;
1535 Some(data.as_bytes()[..split].to_vec())
1536}
1537
1538fn truncate_and_increment_utf8(data: &str, length: usize) -> Option<Vec<u8>> {
1544 let lower_bound = length.saturating_sub(3);
1546 let split = (lower_bound..=length).rfind(|x| data.is_char_boundary(*x))?;
1547 increment_utf8(data.get(..split)?)
1548}
1549
1550fn increment_utf8(data: &str) -> Option<Vec<u8>> {
1557 for (idx, original_char) in data.char_indices().rev() {
1558 let original_len = original_char.len_utf8();
1559 if let Some(next_char) = char::from_u32(original_char as u32 + 1) {
1560 if next_char.len_utf8() == original_len {
1562 let mut result = data.as_bytes()[..idx + original_len].to_vec();
1563 next_char.encode_utf8(&mut result[idx..]);
1564 return Some(result);
1565 }
1566 }
1567 }
1568
1569 None
1570}
1571
1572fn increment(mut data: Vec<u8>) -> Option<Vec<u8>> {
1576 for byte in data.iter_mut().rev() {
1577 let (incremented, overflow) = byte.overflowing_add(1);
1578 *byte = incremented;
1579
1580 if !overflow {
1581 return Some(data);
1582 }
1583 }
1584
1585 None
1586}
1587
1588#[cfg(test)]
1589mod tests {
1590 use crate::{
1591 file::{properties::DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, writer::SerializedFileWriter},
1592 schema::parser::parse_message_type,
1593 };
1594 use core::str;
1595 use rand::distr::uniform::SampleUniform;
1596 use std::{fs::File, sync::Arc};
1597
1598 use crate::column::{
1599 page::PageReader,
1600 reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
1601 };
1602 use crate::file::writer::TrackedWrite;
1603 use crate::file::{
1604 properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
1605 };
1606 use crate::schema::types::{ColumnPath, Type as SchemaType};
1607 use crate::util::test_common::rand_gen::random_numbers_range;
1608
1609 use super::*;
1610
1611 #[test]
1612 fn test_column_writer_inconsistent_def_rep_length() {
1613 let page_writer = get_test_page_writer();
1614 let props = Default::default();
1615 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 1, props);
1616 let res = writer.write_batch(&[1, 2, 3, 4], Some(&[1, 1, 1]), Some(&[0, 0]));
1617 assert!(res.is_err());
1618 if let Err(err) = res {
1619 assert_eq!(
1620 format!("{err}"),
1621 "Parquet error: Inconsistent length of definition and repetition levels: 3 != 2"
1622 );
1623 }
1624 }
1625
1626 #[test]
1627 fn test_column_writer_invalid_def_levels() {
1628 let page_writer = get_test_page_writer();
1629 let props = Default::default();
1630 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1631 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1632 assert!(res.is_err());
1633 if let Err(err) = res {
1634 assert_eq!(
1635 format!("{err}"),
1636 "Parquet error: Definition levels are required, because max definition level = 1"
1637 );
1638 }
1639 }
1640
1641 #[test]
1642 fn test_column_writer_invalid_rep_levels() {
1643 let page_writer = get_test_page_writer();
1644 let props = Default::default();
1645 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 1, props);
1646 let res = writer.write_batch(&[1, 2, 3, 4], None, None);
1647 assert!(res.is_err());
1648 if let Err(err) = res {
1649 assert_eq!(
1650 format!("{err}"),
1651 "Parquet error: Repetition levels are required, because max repetition level = 1"
1652 );
1653 }
1654 }
1655
1656 #[test]
1657 fn test_column_writer_not_enough_values_to_write() {
1658 let page_writer = get_test_page_writer();
1659 let props = Default::default();
1660 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
1661 let res = writer.write_batch(&[1, 2], Some(&[1, 1, 1, 1]), None);
1662 assert!(res.is_err());
1663 if let Err(err) = res {
1664 assert_eq!(
1665 format!("{err}"),
1666 "Parquet error: Expected to write 4 values, but have only 2"
1667 );
1668 }
1669 }
1670
1671 #[test]
1672 fn test_column_writer_write_only_one_dictionary_page() {
1673 let page_writer = get_test_page_writer();
1674 let props = Default::default();
1675 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1676 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1677 writer.add_data_page().unwrap();
1679 writer.write_dictionary_page().unwrap();
1680 let err = writer.write_dictionary_page().unwrap_err().to_string();
1681 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1682 }
1683
1684 #[test]
1685 fn test_column_writer_error_when_writing_disabled_dictionary() {
1686 let page_writer = get_test_page_writer();
1687 let props = Arc::new(
1688 WriterProperties::builder()
1689 .set_dictionary_enabled(false)
1690 .build(),
1691 );
1692 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
1693 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
1694 let err = writer.write_dictionary_page().unwrap_err().to_string();
1695 assert_eq!(err, "Parquet error: Dictionary encoder is not set");
1696 }
1697
1698 #[test]
1699 fn test_column_writer_boolean_type_does_not_support_dictionary() {
1700 let page_writer = get_test_page_writer();
1701 let props = Arc::new(
1702 WriterProperties::builder()
1703 .set_dictionary_enabled(true)
1704 .build(),
1705 );
1706 let mut writer = get_test_column_writer::<BoolType>(page_writer, 0, 0, props);
1707 writer
1708 .write_batch(&[true, false, true, false], None, None)
1709 .unwrap();
1710
1711 let r = writer.close().unwrap();
1712 assert_eq!(r.bytes_written, 1);
1715 assert_eq!(r.rows_written, 4);
1716
1717 let metadata = r.metadata;
1718 assert_eq!(metadata.encodings(), &vec![Encoding::PLAIN, Encoding::RLE]);
1719 assert_eq!(metadata.num_values(), 4); assert_eq!(metadata.dictionary_page_offset(), None);
1721 }
1722
1723 #[test]
1724 fn test_column_writer_default_encoding_support_bool() {
1725 check_encoding_write_support::<BoolType>(
1726 WriterVersion::PARQUET_1_0,
1727 true,
1728 &[true, false],
1729 None,
1730 &[Encoding::PLAIN, Encoding::RLE],
1731 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1732 );
1733 check_encoding_write_support::<BoolType>(
1734 WriterVersion::PARQUET_1_0,
1735 false,
1736 &[true, false],
1737 None,
1738 &[Encoding::PLAIN, Encoding::RLE],
1739 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1740 );
1741 check_encoding_write_support::<BoolType>(
1742 WriterVersion::PARQUET_2_0,
1743 true,
1744 &[true, false],
1745 None,
1746 &[Encoding::RLE],
1747 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1748 );
1749 check_encoding_write_support::<BoolType>(
1750 WriterVersion::PARQUET_2_0,
1751 false,
1752 &[true, false],
1753 None,
1754 &[Encoding::RLE],
1755 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE, 1)],
1756 );
1757 }
1758
1759 #[test]
1760 fn test_column_writer_default_encoding_support_int32() {
1761 check_encoding_write_support::<Int32Type>(
1762 WriterVersion::PARQUET_1_0,
1763 true,
1764 &[1, 2],
1765 Some(0),
1766 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1767 &[
1768 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1769 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1770 ],
1771 );
1772 check_encoding_write_support::<Int32Type>(
1773 WriterVersion::PARQUET_1_0,
1774 false,
1775 &[1, 2],
1776 None,
1777 &[Encoding::PLAIN, Encoding::RLE],
1778 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1779 );
1780 check_encoding_write_support::<Int32Type>(
1781 WriterVersion::PARQUET_2_0,
1782 true,
1783 &[1, 2],
1784 Some(0),
1785 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1786 &[
1787 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1788 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1789 ],
1790 );
1791 check_encoding_write_support::<Int32Type>(
1792 WriterVersion::PARQUET_2_0,
1793 false,
1794 &[1, 2],
1795 None,
1796 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1797 &[encoding_stats(
1798 PageType::DATA_PAGE_V2,
1799 Encoding::DELTA_BINARY_PACKED,
1800 1,
1801 )],
1802 );
1803 }
1804
1805 #[test]
1806 fn test_column_writer_default_encoding_support_int64() {
1807 check_encoding_write_support::<Int64Type>(
1808 WriterVersion::PARQUET_1_0,
1809 true,
1810 &[1, 2],
1811 Some(0),
1812 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1813 &[
1814 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1815 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1816 ],
1817 );
1818 check_encoding_write_support::<Int64Type>(
1819 WriterVersion::PARQUET_1_0,
1820 false,
1821 &[1, 2],
1822 None,
1823 &[Encoding::PLAIN, Encoding::RLE],
1824 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1825 );
1826 check_encoding_write_support::<Int64Type>(
1827 WriterVersion::PARQUET_2_0,
1828 true,
1829 &[1, 2],
1830 Some(0),
1831 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1832 &[
1833 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1834 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1835 ],
1836 );
1837 check_encoding_write_support::<Int64Type>(
1838 WriterVersion::PARQUET_2_0,
1839 false,
1840 &[1, 2],
1841 None,
1842 &[Encoding::RLE, Encoding::DELTA_BINARY_PACKED],
1843 &[encoding_stats(
1844 PageType::DATA_PAGE_V2,
1845 Encoding::DELTA_BINARY_PACKED,
1846 1,
1847 )],
1848 );
1849 }
1850
1851 #[test]
1852 fn test_column_writer_default_encoding_support_int96() {
1853 check_encoding_write_support::<Int96Type>(
1854 WriterVersion::PARQUET_1_0,
1855 true,
1856 &[Int96::from(vec![1, 2, 3])],
1857 Some(0),
1858 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1859 &[
1860 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1861 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1862 ],
1863 );
1864 check_encoding_write_support::<Int96Type>(
1865 WriterVersion::PARQUET_1_0,
1866 false,
1867 &[Int96::from(vec![1, 2, 3])],
1868 None,
1869 &[Encoding::PLAIN, Encoding::RLE],
1870 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1871 );
1872 check_encoding_write_support::<Int96Type>(
1873 WriterVersion::PARQUET_2_0,
1874 true,
1875 &[Int96::from(vec![1, 2, 3])],
1876 Some(0),
1877 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1878 &[
1879 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1880 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1881 ],
1882 );
1883 check_encoding_write_support::<Int96Type>(
1884 WriterVersion::PARQUET_2_0,
1885 false,
1886 &[Int96::from(vec![1, 2, 3])],
1887 None,
1888 &[Encoding::PLAIN, Encoding::RLE],
1889 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1890 );
1891 }
1892
1893 #[test]
1894 fn test_column_writer_default_encoding_support_float() {
1895 check_encoding_write_support::<FloatType>(
1896 WriterVersion::PARQUET_1_0,
1897 true,
1898 &[1.0, 2.0],
1899 Some(0),
1900 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1901 &[
1902 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1903 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1904 ],
1905 );
1906 check_encoding_write_support::<FloatType>(
1907 WriterVersion::PARQUET_1_0,
1908 false,
1909 &[1.0, 2.0],
1910 None,
1911 &[Encoding::PLAIN, Encoding::RLE],
1912 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1913 );
1914 check_encoding_write_support::<FloatType>(
1915 WriterVersion::PARQUET_2_0,
1916 true,
1917 &[1.0, 2.0],
1918 Some(0),
1919 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1920 &[
1921 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1922 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1923 ],
1924 );
1925 check_encoding_write_support::<FloatType>(
1926 WriterVersion::PARQUET_2_0,
1927 false,
1928 &[1.0, 2.0],
1929 None,
1930 &[Encoding::PLAIN, Encoding::RLE],
1931 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1932 );
1933 }
1934
1935 #[test]
1936 fn test_column_writer_default_encoding_support_double() {
1937 check_encoding_write_support::<DoubleType>(
1938 WriterVersion::PARQUET_1_0,
1939 true,
1940 &[1.0, 2.0],
1941 Some(0),
1942 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1943 &[
1944 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1945 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1946 ],
1947 );
1948 check_encoding_write_support::<DoubleType>(
1949 WriterVersion::PARQUET_1_0,
1950 false,
1951 &[1.0, 2.0],
1952 None,
1953 &[Encoding::PLAIN, Encoding::RLE],
1954 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1955 );
1956 check_encoding_write_support::<DoubleType>(
1957 WriterVersion::PARQUET_2_0,
1958 true,
1959 &[1.0, 2.0],
1960 Some(0),
1961 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1962 &[
1963 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1964 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
1965 ],
1966 );
1967 check_encoding_write_support::<DoubleType>(
1968 WriterVersion::PARQUET_2_0,
1969 false,
1970 &[1.0, 2.0],
1971 None,
1972 &[Encoding::PLAIN, Encoding::RLE],
1973 &[encoding_stats(PageType::DATA_PAGE_V2, Encoding::PLAIN, 1)],
1974 );
1975 }
1976
1977 #[test]
1978 fn test_column_writer_default_encoding_support_byte_array() {
1979 check_encoding_write_support::<ByteArrayType>(
1980 WriterVersion::PARQUET_1_0,
1981 true,
1982 &[ByteArray::from(vec![1u8])],
1983 Some(0),
1984 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
1985 &[
1986 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
1987 encoding_stats(PageType::DATA_PAGE, Encoding::RLE_DICTIONARY, 1),
1988 ],
1989 );
1990 check_encoding_write_support::<ByteArrayType>(
1991 WriterVersion::PARQUET_1_0,
1992 false,
1993 &[ByteArray::from(vec![1u8])],
1994 None,
1995 &[Encoding::PLAIN, Encoding::RLE],
1996 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
1997 );
1998 check_encoding_write_support::<ByteArrayType>(
1999 WriterVersion::PARQUET_2_0,
2000 true,
2001 &[ByteArray::from(vec![1u8])],
2002 Some(0),
2003 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2004 &[
2005 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2006 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2007 ],
2008 );
2009 check_encoding_write_support::<ByteArrayType>(
2010 WriterVersion::PARQUET_2_0,
2011 false,
2012 &[ByteArray::from(vec![1u8])],
2013 None,
2014 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2015 &[encoding_stats(
2016 PageType::DATA_PAGE_V2,
2017 Encoding::DELTA_BYTE_ARRAY,
2018 1,
2019 )],
2020 );
2021 }
2022
2023 #[test]
2024 fn test_column_writer_default_encoding_support_fixed_len_byte_array() {
2025 check_encoding_write_support::<FixedLenByteArrayType>(
2026 WriterVersion::PARQUET_1_0,
2027 true,
2028 &[ByteArray::from(vec![1u8]).into()],
2029 None,
2030 &[Encoding::PLAIN, Encoding::RLE],
2031 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2032 );
2033 check_encoding_write_support::<FixedLenByteArrayType>(
2034 WriterVersion::PARQUET_1_0,
2035 false,
2036 &[ByteArray::from(vec![1u8]).into()],
2037 None,
2038 &[Encoding::PLAIN, Encoding::RLE],
2039 &[encoding_stats(PageType::DATA_PAGE, Encoding::PLAIN, 1)],
2040 );
2041 check_encoding_write_support::<FixedLenByteArrayType>(
2042 WriterVersion::PARQUET_2_0,
2043 true,
2044 &[ByteArray::from(vec![1u8]).into()],
2045 Some(0),
2046 &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY],
2047 &[
2048 encoding_stats(PageType::DICTIONARY_PAGE, Encoding::PLAIN, 1),
2049 encoding_stats(PageType::DATA_PAGE_V2, Encoding::RLE_DICTIONARY, 1),
2050 ],
2051 );
2052 check_encoding_write_support::<FixedLenByteArrayType>(
2053 WriterVersion::PARQUET_2_0,
2054 false,
2055 &[ByteArray::from(vec![1u8]).into()],
2056 None,
2057 &[Encoding::RLE, Encoding::DELTA_BYTE_ARRAY],
2058 &[encoding_stats(
2059 PageType::DATA_PAGE_V2,
2060 Encoding::DELTA_BYTE_ARRAY,
2061 1,
2062 )],
2063 );
2064 }
2065
2066 #[test]
2067 fn test_column_writer_check_metadata() {
2068 let page_writer = get_test_page_writer();
2069 let props = Default::default();
2070 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2071 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2072
2073 let r = writer.close().unwrap();
2074 assert_eq!(r.bytes_written, 20);
2075 assert_eq!(r.rows_written, 4);
2076
2077 let metadata = r.metadata;
2078 assert_eq!(
2079 metadata.encodings(),
2080 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2081 );
2082 assert_eq!(metadata.num_values(), 4);
2083 assert_eq!(metadata.compressed_size(), 20);
2084 assert_eq!(metadata.uncompressed_size(), 20);
2085 assert_eq!(metadata.data_page_offset(), 0);
2086 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2087 if let Some(stats) = metadata.statistics() {
2088 assert_eq!(stats.null_count_opt(), Some(0));
2089 assert_eq!(stats.distinct_count_opt(), None);
2090 if let Statistics::Int32(stats) = stats {
2091 assert_eq!(stats.min_opt().unwrap(), &1);
2092 assert_eq!(stats.max_opt().unwrap(), &4);
2093 } else {
2094 panic!("expecting Statistics::Int32");
2095 }
2096 } else {
2097 panic!("metadata missing statistics");
2098 }
2099 }
2100
2101 #[test]
2102 fn test_column_writer_check_byte_array_min_max() {
2103 let page_writer = get_test_page_writer();
2104 let props = Default::default();
2105 let mut writer = get_test_decimals_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
2106 writer
2107 .write_batch(
2108 &[
2109 ByteArray::from(vec![
2110 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2111 35u8, 231u8, 90u8, 0u8, 0u8,
2112 ]),
2113 ByteArray::from(vec![
2114 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 228u8, 62u8, 146u8,
2115 152u8, 177u8, 56u8, 0u8, 0u8,
2116 ]),
2117 ByteArray::from(vec![
2118 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8,
2119 0u8,
2120 ]),
2121 ByteArray::from(vec![
2122 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2123 44u8, 0u8, 0u8,
2124 ]),
2125 ],
2126 None,
2127 None,
2128 )
2129 .unwrap();
2130 let metadata = writer.close().unwrap().metadata;
2131 if let Some(stats) = metadata.statistics() {
2132 if let Statistics::ByteArray(stats) = stats {
2133 assert_eq!(
2134 stats.min_opt().unwrap(),
2135 &ByteArray::from(vec![
2136 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8,
2137 35u8, 231u8, 90u8, 0u8, 0u8,
2138 ])
2139 );
2140 assert_eq!(
2141 stats.max_opt().unwrap(),
2142 &ByteArray::from(vec![
2143 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 41u8, 162u8, 36u8, 26u8, 246u8,
2144 44u8, 0u8, 0u8,
2145 ])
2146 );
2147 } else {
2148 panic!("expecting Statistics::ByteArray");
2149 }
2150 } else {
2151 panic!("metadata missing statistics");
2152 }
2153 }
2154
2155 #[test]
2156 fn test_column_writer_uint32_converted_type_min_max() {
2157 let page_writer = get_test_page_writer();
2158 let props = Default::default();
2159 let mut writer = get_test_unsigned_int_given_as_converted_column_writer::<Int32Type>(
2160 page_writer,
2161 0,
2162 0,
2163 props,
2164 );
2165 writer.write_batch(&[0, 1, 2, 3, 4, 5], None, None).unwrap();
2166 let metadata = writer.close().unwrap().metadata;
2167 if let Some(stats) = metadata.statistics() {
2168 if let Statistics::Int32(stats) = stats {
2169 assert_eq!(stats.min_opt().unwrap(), &0,);
2170 assert_eq!(stats.max_opt().unwrap(), &5,);
2171 } else {
2172 panic!("expecting Statistics::Int32");
2173 }
2174 } else {
2175 panic!("metadata missing statistics");
2176 }
2177 }
2178
2179 #[test]
2180 fn test_column_writer_precalculated_statistics() {
2181 let page_writer = get_test_page_writer();
2182 let props = Arc::new(
2183 WriterProperties::builder()
2184 .set_statistics_enabled(EnabledStatistics::Chunk)
2185 .build(),
2186 );
2187 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2188 writer
2189 .write_batch_with_statistics(
2190 &[1, 2, 3, 4],
2191 None,
2192 None,
2193 Some(&-17),
2194 Some(&9000),
2195 Some(55),
2196 )
2197 .unwrap();
2198
2199 let r = writer.close().unwrap();
2200 assert_eq!(r.bytes_written, 20);
2201 assert_eq!(r.rows_written, 4);
2202
2203 let metadata = r.metadata;
2204 assert_eq!(
2205 metadata.encodings(),
2206 &vec![Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY]
2207 );
2208 assert_eq!(metadata.num_values(), 4);
2209 assert_eq!(metadata.compressed_size(), 20);
2210 assert_eq!(metadata.uncompressed_size(), 20);
2211 assert_eq!(metadata.data_page_offset(), 0);
2212 assert_eq!(metadata.dictionary_page_offset(), Some(0));
2213 if let Some(stats) = metadata.statistics() {
2214 assert_eq!(stats.null_count_opt(), Some(0));
2215 assert_eq!(stats.distinct_count_opt().unwrap_or(0), 55);
2216 if let Statistics::Int32(stats) = stats {
2217 assert_eq!(stats.min_opt().unwrap(), &-17);
2218 assert_eq!(stats.max_opt().unwrap(), &9000);
2219 } else {
2220 panic!("expecting Statistics::Int32");
2221 }
2222 } else {
2223 panic!("metadata missing statistics");
2224 }
2225 }
2226
2227 #[test]
2228 fn test_mixed_precomputed_statistics() {
2229 let mut buf = Vec::with_capacity(100);
2230 let mut write = TrackedWrite::new(&mut buf);
2231 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2232 let props = Default::default();
2233 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2234
2235 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2236 writer
2237 .write_batch_with_statistics(&[5, 6, 7], None, None, Some(&5), Some(&7), Some(3))
2238 .unwrap();
2239
2240 let r = writer.close().unwrap();
2241
2242 let stats = r.metadata.statistics().unwrap();
2243 assert_eq!(stats.min_bytes_opt().unwrap(), 1_i32.to_le_bytes());
2244 assert_eq!(stats.max_bytes_opt().unwrap(), 7_i32.to_le_bytes());
2245 assert_eq!(stats.null_count_opt(), Some(0));
2246 assert!(stats.distinct_count_opt().is_none());
2247
2248 drop(write);
2249
2250 let props = ReaderProperties::builder()
2251 .set_backward_compatible_lz4(false)
2252 .build();
2253 let reader = SerializedPageReader::new_with_properties(
2254 Arc::new(Bytes::from(buf)),
2255 &r.metadata,
2256 r.rows_written as usize,
2257 None,
2258 Arc::new(props),
2259 )
2260 .unwrap();
2261
2262 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2263 assert_eq!(pages.len(), 2);
2264
2265 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2266 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
2267
2268 let page_statistics = pages[1].statistics().unwrap();
2269 assert_eq!(
2270 page_statistics.min_bytes_opt().unwrap(),
2271 1_i32.to_le_bytes()
2272 );
2273 assert_eq!(
2274 page_statistics.max_bytes_opt().unwrap(),
2275 7_i32.to_le_bytes()
2276 );
2277 assert_eq!(page_statistics.null_count_opt(), Some(0));
2278 assert!(page_statistics.distinct_count_opt().is_none());
2279 }
2280
2281 #[test]
2282 fn test_disabled_statistics() {
2283 let mut buf = Vec::with_capacity(100);
2284 let mut write = TrackedWrite::new(&mut buf);
2285 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2286 let props = WriterProperties::builder()
2287 .set_statistics_enabled(EnabledStatistics::None)
2288 .set_writer_version(WriterVersion::PARQUET_2_0)
2289 .build();
2290 let props = Arc::new(props);
2291
2292 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2293 writer
2294 .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
2295 .unwrap();
2296
2297 let r = writer.close().unwrap();
2298 assert!(r.metadata.statistics().is_none());
2299
2300 drop(write);
2301
2302 let props = ReaderProperties::builder()
2303 .set_backward_compatible_lz4(false)
2304 .build();
2305 let reader = SerializedPageReader::new_with_properties(
2306 Arc::new(Bytes::from(buf)),
2307 &r.metadata,
2308 r.rows_written as usize,
2309 None,
2310 Arc::new(props),
2311 )
2312 .unwrap();
2313
2314 let pages = reader.collect::<Result<Vec<_>>>().unwrap();
2315 assert_eq!(pages.len(), 2);
2316
2317 assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
2318 assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);
2319
2320 match &pages[1] {
2321 Page::DataPageV2 {
2322 num_values,
2323 num_nulls,
2324 num_rows,
2325 statistics,
2326 ..
2327 } => {
2328 assert_eq!(*num_values, 6);
2329 assert_eq!(*num_nulls, 2);
2330 assert_eq!(*num_rows, 6);
2331 assert!(statistics.is_none());
2332 }
2333 _ => unreachable!(),
2334 }
2335 }
2336
2337 #[test]
2338 fn test_column_writer_empty_column_roundtrip() {
2339 let props = Default::default();
2340 column_roundtrip::<Int32Type>(props, &[], None, None);
2341 }
2342
2343 #[test]
2344 fn test_column_writer_non_nullable_values_roundtrip() {
2345 let props = Default::default();
2346 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 0, 0);
2347 }
2348
2349 #[test]
2350 fn test_column_writer_nullable_non_repeated_values_roundtrip() {
2351 let props = Default::default();
2352 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 0);
2353 }
2354
2355 #[test]
2356 fn test_column_writer_nullable_repeated_values_roundtrip() {
2357 let props = Default::default();
2358 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2359 }
2360
2361 #[test]
2362 fn test_column_writer_dictionary_fallback_small_data_page() {
2363 let props = WriterProperties::builder()
2364 .set_dictionary_page_size_limit(32)
2365 .set_data_page_size_limit(32)
2366 .build();
2367 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2368 }
2369
2370 #[test]
2371 fn test_column_writer_small_write_batch_size() {
2372 for i in &[1usize, 2, 5, 10, 11, 1023] {
2373 let props = WriterProperties::builder().set_write_batch_size(*i).build();
2374
2375 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2376 }
2377 }
2378
2379 #[test]
2380 fn test_column_writer_dictionary_disabled_v1() {
2381 let props = WriterProperties::builder()
2382 .set_writer_version(WriterVersion::PARQUET_1_0)
2383 .set_dictionary_enabled(false)
2384 .build();
2385 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2386 }
2387
2388 #[test]
2389 fn test_column_writer_dictionary_disabled_v2() {
2390 let props = WriterProperties::builder()
2391 .set_writer_version(WriterVersion::PARQUET_2_0)
2392 .set_dictionary_enabled(false)
2393 .build();
2394 column_roundtrip_random::<Int32Type>(props, 1024, i32::MIN, i32::MAX, 10, 10);
2395 }
2396
2397 #[test]
2398 fn test_column_writer_compression_v1() {
2399 let props = WriterProperties::builder()
2400 .set_writer_version(WriterVersion::PARQUET_1_0)
2401 .set_compression(Compression::SNAPPY)
2402 .build();
2403 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2404 }
2405
2406 #[test]
2407 fn test_column_writer_compression_v2() {
2408 let props = WriterProperties::builder()
2409 .set_writer_version(WriterVersion::PARQUET_2_0)
2410 .set_compression(Compression::SNAPPY)
2411 .build();
2412 column_roundtrip_random::<Int32Type>(props, 2048, i32::MIN, i32::MAX, 10, 10);
2413 }
2414
2415 #[test]
2416 fn test_column_writer_add_data_pages_with_dict() {
2417 let mut file = tempfile::tempfile().unwrap();
2420 let mut write = TrackedWrite::new(&mut file);
2421 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
2422 let props = Arc::new(
2423 WriterProperties::builder()
2424 .set_data_page_size_limit(10)
2425 .set_write_batch_size(3) .build(),
2427 );
2428 let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
2429 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2430 writer.write_batch(data, None, None).unwrap();
2431 let r = writer.close().unwrap();
2432
2433 drop(write);
2434
2435 let props = ReaderProperties::builder()
2437 .set_backward_compatible_lz4(false)
2438 .build();
2439 let mut page_reader = Box::new(
2440 SerializedPageReader::new_with_properties(
2441 Arc::new(file),
2442 &r.metadata,
2443 r.rows_written as usize,
2444 None,
2445 Arc::new(props),
2446 )
2447 .unwrap(),
2448 );
2449 let mut res = Vec::new();
2450 while let Some(page) = page_reader.get_next_page().unwrap() {
2451 res.push((page.page_type(), page.num_values(), page.buffer().len()));
2452 }
2453 assert_eq!(
2454 res,
2455 vec![
2456 (PageType::DICTIONARY_PAGE, 10, 40),
2457 (PageType::DATA_PAGE, 9, 10),
2458 (PageType::DATA_PAGE, 1, 3),
2459 ]
2460 );
2461 assert_eq!(
2462 r.metadata.page_encoding_stats(),
2463 Some(&vec![
2464 PageEncodingStats {
2465 page_type: PageType::DICTIONARY_PAGE,
2466 encoding: Encoding::PLAIN,
2467 count: 1
2468 },
2469 PageEncodingStats {
2470 page_type: PageType::DATA_PAGE,
2471 encoding: Encoding::RLE_DICTIONARY,
2472 count: 2,
2473 }
2474 ])
2475 );
2476 }
2477
2478 #[test]
2479 fn test_bool_statistics() {
2480 let stats = statistics_roundtrip::<BoolType>(&[true, false, false, true]);
2481 assert!(!stats.is_min_max_backwards_compatible());
2484 if let Statistics::Boolean(stats) = stats {
2485 assert_eq!(stats.min_opt().unwrap(), &false);
2486 assert_eq!(stats.max_opt().unwrap(), &true);
2487 } else {
2488 panic!("expecting Statistics::Boolean, got {stats:?}");
2489 }
2490 }
2491
2492 #[test]
2493 fn test_int32_statistics() {
2494 let stats = statistics_roundtrip::<Int32Type>(&[-1, 3, -2, 2]);
2495 assert!(stats.is_min_max_backwards_compatible());
2496 if let Statistics::Int32(stats) = stats {
2497 assert_eq!(stats.min_opt().unwrap(), &-2);
2498 assert_eq!(stats.max_opt().unwrap(), &3);
2499 } else {
2500 panic!("expecting Statistics::Int32, got {stats:?}");
2501 }
2502 }
2503
2504 #[test]
2505 fn test_int64_statistics() {
2506 let stats = statistics_roundtrip::<Int64Type>(&[-1, 3, -2, 2]);
2507 assert!(stats.is_min_max_backwards_compatible());
2508 if let Statistics::Int64(stats) = stats {
2509 assert_eq!(stats.min_opt().unwrap(), &-2);
2510 assert_eq!(stats.max_opt().unwrap(), &3);
2511 } else {
2512 panic!("expecting Statistics::Int64, got {stats:?}");
2513 }
2514 }
2515
2516 #[test]
2517 fn test_int96_statistics() {
2518 let input = vec![
2519 Int96::from(vec![1, 20, 30]),
2520 Int96::from(vec![3, 20, 10]),
2521 Int96::from(vec![0, 20, 30]),
2522 Int96::from(vec![2, 20, 30]),
2523 ]
2524 .into_iter()
2525 .collect::<Vec<Int96>>();
2526
2527 let stats = statistics_roundtrip::<Int96Type>(&input);
2528 assert!(!stats.is_min_max_backwards_compatible());
2529 if let Statistics::Int96(stats) = stats {
2530 assert_eq!(stats.min_opt().unwrap(), &Int96::from(vec![0, 20, 30]));
2531 assert_eq!(stats.max_opt().unwrap(), &Int96::from(vec![3, 20, 10]));
2532 } else {
2533 panic!("expecting Statistics::Int96, got {stats:?}");
2534 }
2535 }
2536
2537 #[test]
2538 fn test_float_statistics() {
2539 let stats = statistics_roundtrip::<FloatType>(&[-1.0, 3.0, -2.0, 2.0]);
2540 assert!(stats.is_min_max_backwards_compatible());
2541 if let Statistics::Float(stats) = stats {
2542 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2543 assert_eq!(stats.max_opt().unwrap(), &3.0);
2544 } else {
2545 panic!("expecting Statistics::Float, got {stats:?}");
2546 }
2547 }
2548
2549 #[test]
2550 fn test_double_statistics() {
2551 let stats = statistics_roundtrip::<DoubleType>(&[-1.0, 3.0, -2.0, 2.0]);
2552 assert!(stats.is_min_max_backwards_compatible());
2553 if let Statistics::Double(stats) = stats {
2554 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2555 assert_eq!(stats.max_opt().unwrap(), &3.0);
2556 } else {
2557 panic!("expecting Statistics::Double, got {stats:?}");
2558 }
2559 }
2560
2561 #[test]
2562 fn test_byte_array_statistics() {
2563 let input = ["aawaa", "zz", "aaw", "m", "qrs"]
2564 .iter()
2565 .map(|&s| s.into())
2566 .collect::<Vec<_>>();
2567
2568 let stats = statistics_roundtrip::<ByteArrayType>(&input);
2569 assert!(!stats.is_min_max_backwards_compatible());
2570 if let Statistics::ByteArray(stats) = stats {
2571 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from("aaw"));
2572 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from("zz"));
2573 } else {
2574 panic!("expecting Statistics::ByteArray, got {stats:?}");
2575 }
2576 }
2577
2578 #[test]
2579 fn test_fixed_len_byte_array_statistics() {
2580 let input = ["aawaa", "zz ", "aaw ", "m ", "qrs "]
2581 .iter()
2582 .map(|&s| ByteArray::from(s).into())
2583 .collect::<Vec<_>>();
2584
2585 let stats = statistics_roundtrip::<FixedLenByteArrayType>(&input);
2586 assert!(!stats.is_min_max_backwards_compatible());
2587 if let Statistics::FixedLenByteArray(stats) = stats {
2588 let expected_min: FixedLenByteArray = ByteArray::from("aaw ").into();
2589 assert_eq!(stats.min_opt().unwrap(), &expected_min);
2590 let expected_max: FixedLenByteArray = ByteArray::from("zz ").into();
2591 assert_eq!(stats.max_opt().unwrap(), &expected_max);
2592 } else {
2593 panic!("expecting Statistics::FixedLenByteArray, got {stats:?}");
2594 }
2595 }
2596
2597 #[test]
2598 fn test_column_writer_check_float16_min_max() {
2599 let input = [
2600 -f16::ONE,
2601 f16::from_f32(3.0),
2602 -f16::from_f32(2.0),
2603 f16::from_f32(2.0),
2604 ]
2605 .into_iter()
2606 .map(|s| ByteArray::from(s).into())
2607 .collect::<Vec<_>>();
2608
2609 let stats = float16_statistics_roundtrip(&input);
2610 assert!(stats.is_min_max_backwards_compatible());
2611 assert_eq!(
2612 stats.min_opt().unwrap(),
2613 &ByteArray::from(-f16::from_f32(2.0))
2614 );
2615 assert_eq!(
2616 stats.max_opt().unwrap(),
2617 &ByteArray::from(f16::from_f32(3.0))
2618 );
2619 }
2620
2621 #[test]
2622 fn test_column_writer_check_float16_nan_middle() {
2623 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2624 .into_iter()
2625 .map(|s| ByteArray::from(s).into())
2626 .collect::<Vec<_>>();
2627
2628 let stats = float16_statistics_roundtrip(&input);
2629 assert!(stats.is_min_max_backwards_compatible());
2630 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2631 assert_eq!(
2632 stats.max_opt().unwrap(),
2633 &ByteArray::from(f16::ONE + f16::ONE)
2634 );
2635 }
2636
2637 #[test]
2638 fn test_float16_statistics_nan_middle() {
2639 let input = [f16::ONE, f16::NAN, f16::ONE + f16::ONE]
2640 .into_iter()
2641 .map(|s| ByteArray::from(s).into())
2642 .collect::<Vec<_>>();
2643
2644 let stats = float16_statistics_roundtrip(&input);
2645 assert!(stats.is_min_max_backwards_compatible());
2646 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2647 assert_eq!(
2648 stats.max_opt().unwrap(),
2649 &ByteArray::from(f16::ONE + f16::ONE)
2650 );
2651 }
2652
2653 #[test]
2654 fn test_float16_statistics_nan_start() {
2655 let input = [f16::NAN, f16::ONE, f16::ONE + f16::ONE]
2656 .into_iter()
2657 .map(|s| ByteArray::from(s).into())
2658 .collect::<Vec<_>>();
2659
2660 let stats = float16_statistics_roundtrip(&input);
2661 assert!(stats.is_min_max_backwards_compatible());
2662 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::ONE));
2663 assert_eq!(
2664 stats.max_opt().unwrap(),
2665 &ByteArray::from(f16::ONE + f16::ONE)
2666 );
2667 }
2668
2669 #[test]
2670 fn test_float16_statistics_nan_only() {
2671 let input = [f16::NAN, f16::NAN]
2672 .into_iter()
2673 .map(|s| ByteArray::from(s).into())
2674 .collect::<Vec<_>>();
2675
2676 let stats = float16_statistics_roundtrip(&input);
2677 assert!(stats.min_bytes_opt().is_none());
2678 assert!(stats.max_bytes_opt().is_none());
2679 assert!(stats.is_min_max_backwards_compatible());
2680 }
2681
2682 #[test]
2683 fn test_float16_statistics_zero_only() {
2684 let input = [f16::ZERO]
2685 .into_iter()
2686 .map(|s| ByteArray::from(s).into())
2687 .collect::<Vec<_>>();
2688
2689 let stats = float16_statistics_roundtrip(&input);
2690 assert!(stats.is_min_max_backwards_compatible());
2691 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2692 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2693 }
2694
2695 #[test]
2696 fn test_float16_statistics_neg_zero_only() {
2697 let input = [f16::NEG_ZERO]
2698 .into_iter()
2699 .map(|s| ByteArray::from(s).into())
2700 .collect::<Vec<_>>();
2701
2702 let stats = float16_statistics_roundtrip(&input);
2703 assert!(stats.is_min_max_backwards_compatible());
2704 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2705 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2706 }
2707
2708 #[test]
2709 fn test_float16_statistics_zero_min() {
2710 let input = [f16::ZERO, f16::ONE, f16::NAN, f16::PI]
2711 .into_iter()
2712 .map(|s| ByteArray::from(s).into())
2713 .collect::<Vec<_>>();
2714
2715 let stats = float16_statistics_roundtrip(&input);
2716 assert!(stats.is_min_max_backwards_compatible());
2717 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(f16::NEG_ZERO));
2718 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::PI));
2719 }
2720
2721 #[test]
2722 fn test_float16_statistics_neg_zero_max() {
2723 let input = [f16::NEG_ZERO, f16::NEG_ONE, f16::NAN, -f16::PI]
2724 .into_iter()
2725 .map(|s| ByteArray::from(s).into())
2726 .collect::<Vec<_>>();
2727
2728 let stats = float16_statistics_roundtrip(&input);
2729 assert!(stats.is_min_max_backwards_compatible());
2730 assert_eq!(stats.min_opt().unwrap(), &ByteArray::from(-f16::PI));
2731 assert_eq!(stats.max_opt().unwrap(), &ByteArray::from(f16::ZERO));
2732 }
2733
2734 #[test]
2735 fn test_float_statistics_nan_middle() {
2736 let stats = statistics_roundtrip::<FloatType>(&[1.0, f32::NAN, 2.0]);
2737 assert!(stats.is_min_max_backwards_compatible());
2738 if let Statistics::Float(stats) = stats {
2739 assert_eq!(stats.min_opt().unwrap(), &1.0);
2740 assert_eq!(stats.max_opt().unwrap(), &2.0);
2741 } else {
2742 panic!("expecting Statistics::Float");
2743 }
2744 }
2745
2746 #[test]
2747 fn test_float_statistics_nan_start() {
2748 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, 1.0, 2.0]);
2749 assert!(stats.is_min_max_backwards_compatible());
2750 if let Statistics::Float(stats) = stats {
2751 assert_eq!(stats.min_opt().unwrap(), &1.0);
2752 assert_eq!(stats.max_opt().unwrap(), &2.0);
2753 } else {
2754 panic!("expecting Statistics::Float");
2755 }
2756 }
2757
2758 #[test]
2759 fn test_float_statistics_nan_only() {
2760 let stats = statistics_roundtrip::<FloatType>(&[f32::NAN, f32::NAN]);
2761 assert!(stats.min_bytes_opt().is_none());
2762 assert!(stats.max_bytes_opt().is_none());
2763 assert!(stats.is_min_max_backwards_compatible());
2764 assert!(matches!(stats, Statistics::Float(_)));
2765 }
2766
2767 #[test]
2768 fn test_float_statistics_zero_only() {
2769 let stats = statistics_roundtrip::<FloatType>(&[0.0]);
2770 assert!(stats.is_min_max_backwards_compatible());
2771 if let Statistics::Float(stats) = stats {
2772 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2773 assert!(stats.min_opt().unwrap().is_sign_negative());
2774 assert_eq!(stats.max_opt().unwrap(), &0.0);
2775 assert!(stats.max_opt().unwrap().is_sign_positive());
2776 } else {
2777 panic!("expecting Statistics::Float");
2778 }
2779 }
2780
2781 #[test]
2782 fn test_float_statistics_neg_zero_only() {
2783 let stats = statistics_roundtrip::<FloatType>(&[-0.0]);
2784 assert!(stats.is_min_max_backwards_compatible());
2785 if let Statistics::Float(stats) = stats {
2786 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2787 assert!(stats.min_opt().unwrap().is_sign_negative());
2788 assert_eq!(stats.max_opt().unwrap(), &0.0);
2789 assert!(stats.max_opt().unwrap().is_sign_positive());
2790 } else {
2791 panic!("expecting Statistics::Float");
2792 }
2793 }
2794
2795 #[test]
2796 fn test_float_statistics_zero_min() {
2797 let stats = statistics_roundtrip::<FloatType>(&[0.0, 1.0, f32::NAN, 2.0]);
2798 assert!(stats.is_min_max_backwards_compatible());
2799 if let Statistics::Float(stats) = stats {
2800 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2801 assert!(stats.min_opt().unwrap().is_sign_negative());
2802 assert_eq!(stats.max_opt().unwrap(), &2.0);
2803 } else {
2804 panic!("expecting Statistics::Float");
2805 }
2806 }
2807
2808 #[test]
2809 fn test_float_statistics_neg_zero_max() {
2810 let stats = statistics_roundtrip::<FloatType>(&[-0.0, -1.0, f32::NAN, -2.0]);
2811 assert!(stats.is_min_max_backwards_compatible());
2812 if let Statistics::Float(stats) = stats {
2813 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2814 assert_eq!(stats.max_opt().unwrap(), &0.0);
2815 assert!(stats.max_opt().unwrap().is_sign_positive());
2816 } else {
2817 panic!("expecting Statistics::Float");
2818 }
2819 }
2820
2821 #[test]
2822 fn test_double_statistics_nan_middle() {
2823 let stats = statistics_roundtrip::<DoubleType>(&[1.0, f64::NAN, 2.0]);
2824 assert!(stats.is_min_max_backwards_compatible());
2825 if let Statistics::Double(stats) = stats {
2826 assert_eq!(stats.min_opt().unwrap(), &1.0);
2827 assert_eq!(stats.max_opt().unwrap(), &2.0);
2828 } else {
2829 panic!("expecting Statistics::Double");
2830 }
2831 }
2832
2833 #[test]
2834 fn test_double_statistics_nan_start() {
2835 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, 1.0, 2.0]);
2836 assert!(stats.is_min_max_backwards_compatible());
2837 if let Statistics::Double(stats) = stats {
2838 assert_eq!(stats.min_opt().unwrap(), &1.0);
2839 assert_eq!(stats.max_opt().unwrap(), &2.0);
2840 } else {
2841 panic!("expecting Statistics::Double");
2842 }
2843 }
2844
2845 #[test]
2846 fn test_double_statistics_nan_only() {
2847 let stats = statistics_roundtrip::<DoubleType>(&[f64::NAN, f64::NAN]);
2848 assert!(stats.min_bytes_opt().is_none());
2849 assert!(stats.max_bytes_opt().is_none());
2850 assert!(matches!(stats, Statistics::Double(_)));
2851 assert!(stats.is_min_max_backwards_compatible());
2852 }
2853
2854 #[test]
2855 fn test_double_statistics_zero_only() {
2856 let stats = statistics_roundtrip::<DoubleType>(&[0.0]);
2857 assert!(stats.is_min_max_backwards_compatible());
2858 if let Statistics::Double(stats) = stats {
2859 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2860 assert!(stats.min_opt().unwrap().is_sign_negative());
2861 assert_eq!(stats.max_opt().unwrap(), &0.0);
2862 assert!(stats.max_opt().unwrap().is_sign_positive());
2863 } else {
2864 panic!("expecting Statistics::Double");
2865 }
2866 }
2867
2868 #[test]
2869 fn test_double_statistics_neg_zero_only() {
2870 let stats = statistics_roundtrip::<DoubleType>(&[-0.0]);
2871 assert!(stats.is_min_max_backwards_compatible());
2872 if let Statistics::Double(stats) = stats {
2873 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2874 assert!(stats.min_opt().unwrap().is_sign_negative());
2875 assert_eq!(stats.max_opt().unwrap(), &0.0);
2876 assert!(stats.max_opt().unwrap().is_sign_positive());
2877 } else {
2878 panic!("expecting Statistics::Double");
2879 }
2880 }
2881
2882 #[test]
2883 fn test_double_statistics_zero_min() {
2884 let stats = statistics_roundtrip::<DoubleType>(&[0.0, 1.0, f64::NAN, 2.0]);
2885 assert!(stats.is_min_max_backwards_compatible());
2886 if let Statistics::Double(stats) = stats {
2887 assert_eq!(stats.min_opt().unwrap(), &-0.0);
2888 assert!(stats.min_opt().unwrap().is_sign_negative());
2889 assert_eq!(stats.max_opt().unwrap(), &2.0);
2890 } else {
2891 panic!("expecting Statistics::Double");
2892 }
2893 }
2894
2895 #[test]
2896 fn test_double_statistics_neg_zero_max() {
2897 let stats = statistics_roundtrip::<DoubleType>(&[-0.0, -1.0, f64::NAN, -2.0]);
2898 assert!(stats.is_min_max_backwards_compatible());
2899 if let Statistics::Double(stats) = stats {
2900 assert_eq!(stats.min_opt().unwrap(), &-2.0);
2901 assert_eq!(stats.max_opt().unwrap(), &0.0);
2902 assert!(stats.max_opt().unwrap().is_sign_positive());
2903 } else {
2904 panic!("expecting Statistics::Double");
2905 }
2906 }
2907
2908 #[test]
2909 fn test_compare_greater_byte_array_decimals() {
2910 assert!(!compare_greater_byte_array_decimals(&[], &[],),);
2911 assert!(compare_greater_byte_array_decimals(&[1u8,], &[],),);
2912 assert!(!compare_greater_byte_array_decimals(&[], &[1u8,],),);
2913 assert!(compare_greater_byte_array_decimals(&[1u8,], &[0u8,],),);
2914 assert!(!compare_greater_byte_array_decimals(&[1u8,], &[1u8,],),);
2915 assert!(compare_greater_byte_array_decimals(&[1u8, 0u8,], &[0u8,],),);
2916 assert!(!compare_greater_byte_array_decimals(
2917 &[0u8, 1u8,],
2918 &[1u8, 0u8,],
2919 ),);
2920 assert!(!compare_greater_byte_array_decimals(
2921 &[255u8, 35u8, 0u8, 0u8,],
2922 &[0u8,],
2923 ),);
2924 assert!(compare_greater_byte_array_decimals(
2925 &[0u8,],
2926 &[255u8, 35u8, 0u8, 0u8,],
2927 ),);
2928 }
2929
2930 #[test]
2931 fn test_column_index_with_null_pages() {
2932 let page_writer = get_test_page_writer();
2934 let props = Default::default();
2935 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
2936 writer.write_batch(&[], Some(&[0, 0, 0, 0]), None).unwrap();
2937
2938 let r = writer.close().unwrap();
2939 assert!(r.column_index.is_some());
2940 let col_idx = r.column_index.unwrap();
2941 assert!(col_idx.null_pages[0]);
2943 assert_eq!(col_idx.min_values[0].len(), 0);
2945 assert_eq!(col_idx.max_values[0].len(), 0);
2946 assert!(col_idx.null_counts.is_some());
2948 assert_eq!(col_idx.null_counts.as_ref().unwrap()[0], 4);
2949 assert!(col_idx.repetition_level_histograms.is_none());
2951 assert!(col_idx.definition_level_histograms.is_some());
2953 assert_eq!(col_idx.definition_level_histograms.unwrap(), &[4, 0]);
2954 }
2955
2956 #[test]
2957 fn test_column_offset_index_metadata() {
2958 let page_writer = get_test_page_writer();
2961 let props = Default::default();
2962 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
2963 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
2964 writer.flush_data_pages().unwrap();
2966 writer.write_batch(&[4, 8, 2, -5], None, None).unwrap();
2968
2969 let r = writer.close().unwrap();
2970 let column_index = r.column_index.unwrap();
2971 let offset_index = r.offset_index.unwrap();
2972
2973 assert_eq!(8, r.rows_written);
2974
2975 assert_eq!(2, column_index.null_pages.len());
2977 assert_eq!(2, offset_index.page_locations.len());
2978 assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
2979 for idx in 0..2 {
2980 assert!(!column_index.null_pages[idx]);
2981 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[idx]);
2982 }
2983
2984 if let Some(stats) = r.metadata.statistics() {
2985 assert_eq!(stats.null_count_opt(), Some(0));
2986 assert_eq!(stats.distinct_count_opt(), None);
2987 if let Statistics::Int32(stats) = stats {
2988 assert_eq!(
2992 stats.min_bytes_opt(),
2993 Some(column_index.min_values[1].as_slice())
2994 );
2995 assert_eq!(
2996 stats.max_bytes_opt(),
2997 column_index.max_values.get(1).map(Vec::as_slice)
2998 );
2999 } else {
3000 panic!("expecting Statistics::Int32");
3001 }
3002 } else {
3003 panic!("metadata missing statistics");
3004 }
3005
3006 assert_eq!(0, offset_index.page_locations[0].first_row_index);
3008 assert_eq!(4, offset_index.page_locations[1].first_row_index);
3009 }
3010
3011 #[test]
3013 fn test_column_offset_index_metadata_truncating() {
3014 let page_writer = get_test_page_writer();
3017 let props = Default::default();
3018 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3019
3020 let mut data = vec![FixedLenByteArray::default(); 3];
3021 data[0].set_data(Bytes::from(vec![97_u8; 200]));
3023 data[1].set_data(Bytes::from(vec![112_u8; 200]));
3025 data[2].set_data(Bytes::from(vec![98_u8; 200]));
3026
3027 writer.write_batch(&data, None, None).unwrap();
3028
3029 writer.flush_data_pages().unwrap();
3030
3031 let r = writer.close().unwrap();
3032 let column_index = r.column_index.unwrap();
3033 let offset_index = r.offset_index.unwrap();
3034
3035 assert_eq!(3, r.rows_written);
3036
3037 assert_eq!(1, column_index.null_pages.len());
3039 assert_eq!(1, offset_index.page_locations.len());
3040 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3041 assert!(!column_index.null_pages[0]);
3042 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3043
3044 if let Some(stats) = r.metadata.statistics() {
3045 assert_eq!(stats.null_count_opt(), Some(0));
3046 assert_eq!(stats.distinct_count_opt(), None);
3047 if let Statistics::FixedLenByteArray(stats) = stats {
3048 let column_index_min_value = &column_index.min_values[0];
3049 let column_index_max_value = &column_index.max_values[0];
3050
3051 assert_ne!(
3053 stats.min_bytes_opt(),
3054 Some(column_index_min_value.as_slice())
3055 );
3056 assert_ne!(
3057 stats.max_bytes_opt(),
3058 Some(column_index_max_value.as_slice())
3059 );
3060
3061 assert_eq!(
3062 column_index_min_value.len(),
3063 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3064 );
3065 assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
3066 assert_eq!(
3067 column_index_max_value.len(),
3068 DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
3069 );
3070
3071 assert_eq!(
3073 *column_index_max_value.last().unwrap(),
3074 *column_index_max_value.first().unwrap() + 1
3075 );
3076 } else {
3077 panic!("expecting Statistics::FixedLenByteArray");
3078 }
3079 } else {
3080 panic!("metadata missing statistics");
3081 }
3082 }
3083
3084 #[test]
3085 fn test_column_offset_index_truncating_spec_example() {
3086 let page_writer = get_test_page_writer();
3089
3090 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3092 let props = Arc::new(builder.build());
3093 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3094
3095 let mut data = vec![FixedLenByteArray::default(); 1];
3096 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3098
3099 writer.write_batch(&data, None, None).unwrap();
3100
3101 writer.flush_data_pages().unwrap();
3102
3103 let r = writer.close().unwrap();
3104 let column_index = r.column_index.unwrap();
3105 let offset_index = r.offset_index.unwrap();
3106
3107 assert_eq!(1, r.rows_written);
3108
3109 assert_eq!(1, column_index.null_pages.len());
3111 assert_eq!(1, offset_index.page_locations.len());
3112 assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
3113 assert!(!column_index.null_pages[0]);
3114 assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
3115
3116 if let Some(stats) = r.metadata.statistics() {
3117 assert_eq!(stats.null_count_opt(), Some(0));
3118 assert_eq!(stats.distinct_count_opt(), None);
3119 if let Statistics::FixedLenByteArray(_stats) = stats {
3120 let column_index_min_value = &column_index.min_values[0];
3121 let column_index_max_value = &column_index.max_values[0];
3122
3123 assert_eq!(column_index_min_value.len(), 1);
3124 assert_eq!(column_index_max_value.len(), 1);
3125
3126 assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
3127 assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
3128
3129 assert_ne!(column_index_min_value, stats.min_bytes_opt().unwrap());
3130 assert_ne!(column_index_max_value, stats.max_bytes_opt().unwrap());
3131 } else {
3132 panic!("expecting Statistics::FixedLenByteArray");
3133 }
3134 } else {
3135 panic!("metadata missing statistics");
3136 }
3137 }
3138
3139 #[test]
3140 fn test_float16_min_max_no_truncation() {
3141 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3143 let props = Arc::new(builder.build());
3144 let page_writer = get_test_page_writer();
3145 let mut writer = get_test_float16_column_writer(page_writer, props);
3146
3147 let expected_value = f16::PI.to_le_bytes().to_vec();
3148 let data = vec![ByteArray::from(expected_value.clone()).into()];
3149 writer.write_batch(&data, None, None).unwrap();
3150 writer.flush_data_pages().unwrap();
3151
3152 let r = writer.close().unwrap();
3153
3154 let column_index = r.column_index.unwrap();
3157 let column_index_min_bytes = column_index.min_values[0].as_slice();
3158 let column_index_max_bytes = column_index.max_values[0].as_slice();
3159 assert_eq!(expected_value, column_index_min_bytes);
3160 assert_eq!(expected_value, column_index_max_bytes);
3161
3162 let stats = r.metadata.statistics().unwrap();
3164 if let Statistics::FixedLenByteArray(stats) = stats {
3165 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3166 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3167 assert_eq!(expected_value, stats_min_bytes);
3168 assert_eq!(expected_value, stats_max_bytes);
3169 } else {
3170 panic!("expecting Statistics::FixedLenByteArray");
3171 }
3172 }
3173
3174 #[test]
3175 fn test_decimal_min_max_no_truncation() {
3176 let builder = WriterProperties::builder().set_column_index_truncate_length(Some(1));
3178 let props = Arc::new(builder.build());
3179 let page_writer = get_test_page_writer();
3180 let mut writer =
3181 get_test_decimals_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3182
3183 let expected_value = vec![
3184 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 255u8, 179u8, 172u8, 19u8, 35u8,
3185 231u8, 90u8, 0u8, 0u8,
3186 ];
3187 let data = vec![ByteArray::from(expected_value.clone()).into()];
3188 writer.write_batch(&data, None, None).unwrap();
3189 writer.flush_data_pages().unwrap();
3190
3191 let r = writer.close().unwrap();
3192
3193 let column_index = r.column_index.unwrap();
3196 let column_index_min_bytes = column_index.min_values[0].as_slice();
3197 let column_index_max_bytes = column_index.max_values[0].as_slice();
3198 assert_eq!(expected_value, column_index_min_bytes);
3199 assert_eq!(expected_value, column_index_max_bytes);
3200
3201 let stats = r.metadata.statistics().unwrap();
3203 if let Statistics::FixedLenByteArray(stats) = stats {
3204 let stats_min_bytes = stats.min_bytes_opt().unwrap();
3205 let stats_max_bytes = stats.max_bytes_opt().unwrap();
3206 assert_eq!(expected_value, stats_min_bytes);
3207 assert_eq!(expected_value, stats_max_bytes);
3208 } else {
3209 panic!("expecting Statistics::FixedLenByteArray");
3210 }
3211 }
3212
3213 #[test]
3214 fn test_statistics_truncating_byte_array() {
3215 let page_writer = get_test_page_writer();
3216
3217 const TEST_TRUNCATE_LENGTH: usize = 1;
3218
3219 let builder =
3221 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3222 let props = Arc::new(builder.build());
3223 let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);
3224
3225 let mut data = vec![ByteArray::default(); 1];
3226 data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));
3228
3229 writer.write_batch(&data, None, None).unwrap();
3230
3231 writer.flush_data_pages().unwrap();
3232
3233 let r = writer.close().unwrap();
3234
3235 assert_eq!(1, r.rows_written);
3236
3237 let stats = r.metadata.statistics().expect("statistics");
3238 assert_eq!(stats.null_count_opt(), Some(0));
3239 assert_eq!(stats.distinct_count_opt(), None);
3240 if let Statistics::ByteArray(_stats) = stats {
3241 let min_value = _stats.min_opt().unwrap();
3242 let max_value = _stats.max_opt().unwrap();
3243
3244 assert!(!_stats.min_is_exact());
3245 assert!(!_stats.max_is_exact());
3246
3247 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3248 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3249
3250 assert_eq!("B".as_bytes(), min_value.as_bytes());
3251 assert_eq!("C".as_bytes(), max_value.as_bytes());
3252 } else {
3253 panic!("expecting Statistics::ByteArray");
3254 }
3255 }
3256
3257 #[test]
3258 fn test_statistics_truncating_fixed_len_byte_array() {
3259 let page_writer = get_test_page_writer();
3260
3261 const TEST_TRUNCATE_LENGTH: usize = 1;
3262
3263 let builder =
3265 WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
3266 let props = Arc::new(builder.build());
3267 let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
3268
3269 let mut data = vec![FixedLenByteArray::default(); 1];
3270
3271 const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
3272 const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();
3273
3274 const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
3276 [PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];
3277
3278 data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));
3280
3281 writer.write_batch(&data, None, None).unwrap();
3282
3283 writer.flush_data_pages().unwrap();
3284
3285 let r = writer.close().unwrap();
3286
3287 assert_eq!(1, r.rows_written);
3288
3289 let stats = r.metadata.statistics().expect("statistics");
3290 assert_eq!(stats.null_count_opt(), Some(0));
3291 assert_eq!(stats.distinct_count_opt(), None);
3292 if let Statistics::FixedLenByteArray(_stats) = stats {
3293 let min_value = _stats.min_opt().unwrap();
3294 let max_value = _stats.max_opt().unwrap();
3295
3296 assert!(!_stats.min_is_exact());
3297 assert!(!_stats.max_is_exact());
3298
3299 assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
3300 assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);
3301
3302 assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
3303 assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());
3304
3305 let reconstructed_min = i128::from_be_bytes([
3306 min_value.as_bytes()[0],
3307 0,
3308 0,
3309 0,
3310 0,
3311 0,
3312 0,
3313 0,
3314 0,
3315 0,
3316 0,
3317 0,
3318 0,
3319 0,
3320 0,
3321 0,
3322 ]);
3323
3324 let reconstructed_max = i128::from_be_bytes([
3325 max_value.as_bytes()[0],
3326 0,
3327 0,
3328 0,
3329 0,
3330 0,
3331 0,
3332 0,
3333 0,
3334 0,
3335 0,
3336 0,
3337 0,
3338 0,
3339 0,
3340 0,
3341 ]);
3342
3343 println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
3345 assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
3346 println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
3347 assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
3348 } else {
3349 panic!("expecting Statistics::FixedLenByteArray");
3350 }
3351 }
3352
3353 #[test]
3354 fn test_send() {
3355 fn test<T: Send>() {}
3356 test::<ColumnWriterImpl<Int32Type>>();
3357 }
3358
3359 #[test]
3360 fn test_increment() {
3361 let v = increment(vec![0, 0, 0]).unwrap();
3362 assert_eq!(&v, &[0, 0, 1]);
3363
3364 let v = increment(vec![0, 255, 255]).unwrap();
3366 assert_eq!(&v, &[1, 0, 0]);
3367
3368 let v = increment(vec![255, 255, 255]);
3370 assert!(v.is_none());
3371 }
3372
3373 #[test]
3374 fn test_increment_utf8() {
3375 let test_inc = |o: &str, expected: &str| {
3376 if let Ok(v) = String::from_utf8(increment_utf8(o).unwrap()) {
3377 assert_eq!(v, expected);
3379 assert!(*v > *o);
3381 let mut greater = ByteArray::new();
3383 greater.set_data(Bytes::from(v));
3384 let mut original = ByteArray::new();
3385 original.set_data(Bytes::from(o.as_bytes().to_vec()));
3386 assert!(greater > original);
3387 } else {
3388 panic!("Expected incremented UTF8 string to also be valid.");
3389 }
3390 };
3391
3392 test_inc("hello", "hellp");
3394
3395 test_inc("a\u{7f}", "b");
3397
3398 assert!(increment_utf8("\u{7f}\u{7f}").is_none());
3400
3401 test_inc("❤️🧡💛💚💙💜", "❤️🧡💛💚💙💝");
3403
3404 test_inc("éééé", "éééê");
3406
3407 test_inc("\u{ff}\u{ff}", "\u{ff}\u{100}");
3409
3410 test_inc("a\u{7ff}", "b");
3412
3413 assert!(increment_utf8("\u{7ff}\u{7ff}").is_none());
3415
3416 test_inc("ࠀࠀ", "ࠀࠁ");
3419
3420 test_inc("a\u{ffff}", "b");
3422
3423 assert!(increment_utf8("\u{ffff}\u{ffff}").is_none());
3425
3426 test_inc("𐀀𐀀", "𐀀𐀁");
3428
3429 test_inc("a\u{10ffff}", "b");
3431
3432 assert!(increment_utf8("\u{10ffff}\u{10ffff}").is_none());
3434
3435 test_inc("a\u{D7FF}", "b");
3438 }
3439
3440 #[test]
3441 fn test_truncate_utf8() {
3442 let data = "❤️🧡💛💚💙💜";
3444 let r = truncate_utf8(data, data.len()).unwrap();
3445 assert_eq!(r.len(), data.len());
3446 assert_eq!(&r, data.as_bytes());
3447
3448 let r = truncate_utf8(data, 13).unwrap();
3450 assert_eq!(r.len(), 10);
3451 assert_eq!(&r, "❤️🧡".as_bytes());
3452
3453 let r = truncate_utf8("\u{0836}", 1);
3455 assert!(r.is_none());
3456
3457 let r = truncate_and_increment_utf8("yyyyyyyyy", 8).unwrap();
3460 assert_eq!(&r, "yyyyyyyz".as_bytes());
3461
3462 let r = truncate_and_increment_utf8("ééééé", 7).unwrap();
3464 assert_eq!(&r, "ééê".as_bytes());
3465
3466 let r = truncate_and_increment_utf8("\u{ff}\u{ff}\u{ff}\u{ff}\u{ff}", 8).unwrap();
3468 assert_eq!(&r, "\u{ff}\u{ff}\u{ff}\u{100}".as_bytes());
3469
3470 let r = truncate_and_increment_utf8("߿߿߿߿߿", 8);
3472 assert!(r.is_none());
3473
3474 let r = truncate_and_increment_utf8("ࠀࠀࠀࠀ", 8).unwrap();
3477 assert_eq!(&r, "ࠀࠁ".as_bytes());
3478
3479 let r = truncate_and_increment_utf8("\u{ffff}\u{ffff}\u{ffff}", 8);
3481 assert!(r.is_none());
3482
3483 let r = truncate_and_increment_utf8("𐀀𐀀𐀀𐀀", 9).unwrap();
3485 assert_eq!(&r, "𐀀𐀁".as_bytes());
3486
3487 let r = truncate_and_increment_utf8("\u{10ffff}\u{10ffff}", 8);
3489 assert!(r.is_none());
3490 }
3491
3492 #[test]
3493 fn test_byte_array_truncate_invalid_utf8_statistics() {
3496 let message_type = "
3497 message test_schema {
3498 OPTIONAL BYTE_ARRAY a (UTF8);
3499 }
3500 ";
3501 let schema = Arc::new(parse_message_type(message_type).unwrap());
3502
3503 let data = vec![ByteArray::from(vec![128u8; 32]); 7];
3505 let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3506 let file: File = tempfile::tempfile().unwrap();
3507 let props = Arc::new(
3508 WriterProperties::builder()
3509 .set_statistics_enabled(EnabledStatistics::Chunk)
3510 .set_statistics_truncate_length(Some(8))
3511 .build(),
3512 );
3513
3514 let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3515 let mut row_group_writer = writer.next_row_group().unwrap();
3516
3517 let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3518 col_writer
3519 .typed::<ByteArrayType>()
3520 .write_batch(&data, Some(&def_levels), None)
3521 .unwrap();
3522 col_writer.close().unwrap();
3523 row_group_writer.close().unwrap();
3524 let file_metadata = writer.close().unwrap();
3525 assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
3526 let stats = file_metadata.row_groups[0].columns[0]
3527 .meta_data
3528 .as_ref()
3529 .unwrap()
3530 .statistics
3531 .as_ref()
3532 .unwrap();
3533 assert!(!stats.is_max_value_exact.unwrap());
3534 assert_eq!(
3537 stats.max_value,
3538 Some([128, 128, 128, 128, 128, 128, 128, 129].to_vec())
3539 );
3540 }
3541
3542 #[test]
3543 fn test_increment_max_binary_chars() {
3544 let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
3545 assert_eq!(&r.unwrap(), &[0xFF, 0xFE, 0xFE, 0x00, 0x00]);
3546
3547 let incremented = increment(vec![0xFF, 0xFF, 0xFF]);
3548 assert!(incremented.is_none())
3549 }
3550
3551 #[test]
3552 fn test_no_column_index_when_stats_disabled() {
3553 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3557 let props = Arc::new(
3558 WriterProperties::builder()
3559 .set_statistics_enabled(EnabledStatistics::None)
3560 .build(),
3561 );
3562 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3563 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3564
3565 let data = Vec::new();
3566 let def_levels = vec![0; 10];
3567 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3568 writer.flush_data_pages().unwrap();
3569
3570 let column_close_result = writer.close().unwrap();
3571 assert!(column_close_result.offset_index.is_some());
3572 assert!(column_close_result.column_index.is_none());
3573 }
3574
3575 #[test]
3576 fn test_no_offset_index_when_disabled() {
3577 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3579 let props = Arc::new(
3580 WriterProperties::builder()
3581 .set_statistics_enabled(EnabledStatistics::None)
3582 .set_offset_index_disabled(true)
3583 .build(),
3584 );
3585 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3586 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3587
3588 let data = Vec::new();
3589 let def_levels = vec![0; 10];
3590 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3591 writer.flush_data_pages().unwrap();
3592
3593 let column_close_result = writer.close().unwrap();
3594 assert!(column_close_result.offset_index.is_none());
3595 assert!(column_close_result.column_index.is_none());
3596 }
3597
3598 #[test]
3599 fn test_offset_index_overridden() {
3600 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3602 let props = Arc::new(
3603 WriterProperties::builder()
3604 .set_statistics_enabled(EnabledStatistics::Page)
3605 .set_offset_index_disabled(true)
3606 .build(),
3607 );
3608 let column_writer = get_column_writer(descr, props, get_test_page_writer());
3609 let mut writer = get_typed_column_writer::<Int32Type>(column_writer);
3610
3611 let data = Vec::new();
3612 let def_levels = vec![0; 10];
3613 writer.write_batch(&data, Some(&def_levels), None).unwrap();
3614 writer.flush_data_pages().unwrap();
3615
3616 let column_close_result = writer.close().unwrap();
3617 assert!(column_close_result.offset_index.is_some());
3618 assert!(column_close_result.column_index.is_some());
3619 }
3620
3621 #[test]
3622 fn test_boundary_order() -> Result<()> {
3623 let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
3624 let column_close_result = write_multiple_pages::<Int32Type>(
3626 &descr,
3627 &[
3628 &[Some(-10), Some(10)],
3629 &[Some(-5), Some(11)],
3630 &[None],
3631 &[Some(-5), Some(11)],
3632 ],
3633 )?;
3634 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3635 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3636
3637 let column_close_result = write_multiple_pages::<Int32Type>(
3639 &descr,
3640 &[
3641 &[Some(10), Some(11)],
3642 &[Some(5), Some(11)],
3643 &[None],
3644 &[Some(-5), Some(0)],
3645 ],
3646 )?;
3647 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3648 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3649
3650 let column_close_result = write_multiple_pages::<Int32Type>(
3652 &descr,
3653 &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
3654 )?;
3655 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3656 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3657
3658 let column_close_result =
3660 write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
3661 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3662 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3663
3664 let column_close_result =
3666 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
3667 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3668 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3669
3670 let column_close_result =
3672 write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
3673 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3674 assert_eq!(boundary_order, BoundaryOrder::ASCENDING);
3675
3676 let column_close_result = write_multiple_pages::<Int32Type>(
3678 &descr,
3679 &[
3680 &[Some(10), Some(11)],
3681 &[Some(11), Some(16)],
3682 &[None],
3683 &[Some(-5), Some(0)],
3684 ],
3685 )?;
3686 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3687 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3688
3689 let column_close_result = write_multiple_pages::<Int32Type>(
3691 &descr,
3692 &[
3693 &[Some(1), Some(9)],
3694 &[Some(2), Some(8)],
3695 &[None],
3696 &[Some(3), Some(7)],
3697 ],
3698 )?;
3699 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3700 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3701
3702 Ok(())
3703 }
3704
3705 #[test]
3706 fn test_boundary_order_logical_type() -> Result<()> {
3707 let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
3710 let fba_descr = {
3711 let tpe = SchemaType::primitive_type_builder(
3712 "col",
3713 FixedLenByteArrayType::get_physical_type(),
3714 )
3715 .with_length(2)
3716 .build()?;
3717 Arc::new(ColumnDescriptor::new(
3718 Arc::new(tpe),
3719 1,
3720 0,
3721 ColumnPath::from("col"),
3722 ))
3723 };
3724
3725 let values: &[&[Option<FixedLenByteArray>]] = &[
3726 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
3727 &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
3728 &[Some(FixedLenByteArray::from(ByteArray::from(
3729 f16::NEG_ZERO,
3730 )))],
3731 &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
3732 ];
3733
3734 let column_close_result =
3736 write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
3737 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3738 assert_eq!(boundary_order, BoundaryOrder::DESCENDING);
3739
3740 let column_close_result =
3742 write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
3743 let boundary_order = column_close_result.column_index.unwrap().boundary_order;
3744 assert_eq!(boundary_order, BoundaryOrder::UNORDERED);
3745
3746 Ok(())
3747 }
3748
3749 #[test]
3750 fn test_interval_stats_should_not_have_min_max() {
3751 let input = [
3752 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
3753 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1],
3754 vec![0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2],
3755 ]
3756 .into_iter()
3757 .map(|s| ByteArray::from(s).into())
3758 .collect::<Vec<_>>();
3759
3760 let page_writer = get_test_page_writer();
3761 let mut writer = get_test_interval_column_writer(page_writer);
3762 writer.write_batch(&input, None, None).unwrap();
3763
3764 let metadata = writer.close().unwrap().metadata;
3765 let stats = if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
3766 stats.clone()
3767 } else {
3768 panic!("metadata missing statistics");
3769 };
3770 assert!(stats.min_bytes_opt().is_none());
3771 assert!(stats.max_bytes_opt().is_none());
3772 }
3773
3774 #[test]
3775 #[cfg(feature = "arrow")]
3776 fn test_column_writer_get_estimated_total_bytes() {
3777 let page_writer = get_test_page_writer();
3778 let props = Default::default();
3779 let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
3780 assert_eq!(writer.get_estimated_total_bytes(), 0);
3781
3782 writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
3783 writer.add_data_page().unwrap();
3784 let size_with_one_page = writer.get_estimated_total_bytes();
3785 assert_eq!(size_with_one_page, 20);
3786
3787 writer.write_batch(&[5, 6, 7, 8], None, None).unwrap();
3788 writer.add_data_page().unwrap();
3789 let size_with_two_pages = writer.get_estimated_total_bytes();
3790 assert_eq!(size_with_two_pages, 20 + 21);
3792 }
3793
3794 fn write_multiple_pages<T: DataType>(
3795 column_descr: &Arc<ColumnDescriptor>,
3796 pages: &[&[Option<T::T>]],
3797 ) -> Result<ColumnCloseResult> {
3798 let column_writer = get_column_writer(
3799 column_descr.clone(),
3800 Default::default(),
3801 get_test_page_writer(),
3802 );
3803 let mut writer = get_typed_column_writer::<T>(column_writer);
3804
3805 for &page in pages {
3806 let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
3807 let def_levels = page
3808 .iter()
3809 .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
3810 .collect::<Vec<_>>();
3811 writer.write_batch(&values, Some(&def_levels), None)?;
3812 writer.flush_data_pages()?;
3813 }
3814
3815 writer.close()
3816 }
3817
3818 fn column_roundtrip_random<T: DataType>(
3822 props: WriterProperties,
3823 max_size: usize,
3824 min_value: T::T,
3825 max_value: T::T,
3826 max_def_level: i16,
3827 max_rep_level: i16,
3828 ) where
3829 T::T: PartialOrd + SampleUniform + Copy,
3830 {
3831 let mut num_values: usize = 0;
3832
3833 let mut buf: Vec<i16> = Vec::new();
3834 let def_levels = if max_def_level > 0 {
3835 random_numbers_range(max_size, 0, max_def_level + 1, &mut buf);
3836 for &dl in &buf[..] {
3837 if dl == max_def_level {
3838 num_values += 1;
3839 }
3840 }
3841 Some(&buf[..])
3842 } else {
3843 num_values = max_size;
3844 None
3845 };
3846
3847 let mut buf: Vec<i16> = Vec::new();
3848 let rep_levels = if max_rep_level > 0 {
3849 random_numbers_range(max_size, 0, max_rep_level + 1, &mut buf);
3850 buf[0] = 0; Some(&buf[..])
3852 } else {
3853 None
3854 };
3855
3856 let mut values: Vec<T::T> = Vec::new();
3857 random_numbers_range(num_values, min_value, max_value, &mut values);
3858
3859 column_roundtrip::<T>(props, &values[..], def_levels, rep_levels);
3860 }
3861
3862 fn column_roundtrip<T: DataType>(
3864 props: WriterProperties,
3865 values: &[T::T],
3866 def_levels: Option<&[i16]>,
3867 rep_levels: Option<&[i16]>,
3868 ) {
3869 let mut file = tempfile::tempfile().unwrap();
3870 let mut write = TrackedWrite::new(&mut file);
3871 let page_writer = Box::new(SerializedPageWriter::new(&mut write));
3872
3873 let max_def_level = match def_levels {
3874 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3875 None => 0i16,
3876 };
3877
3878 let max_rep_level = match rep_levels {
3879 Some(buf) => *buf.iter().max().unwrap_or(&0i16),
3880 None => 0i16,
3881 };
3882
3883 let mut max_batch_size = values.len();
3884 if let Some(levels) = def_levels {
3885 max_batch_size = max_batch_size.max(levels.len());
3886 }
3887 if let Some(levels) = rep_levels {
3888 max_batch_size = max_batch_size.max(levels.len());
3889 }
3890
3891 let mut writer =
3892 get_test_column_writer::<T>(page_writer, max_def_level, max_rep_level, Arc::new(props));
3893
3894 let values_written = writer.write_batch(values, def_levels, rep_levels).unwrap();
3895 assert_eq!(values_written, values.len());
3896 let result = writer.close().unwrap();
3897
3898 drop(write);
3899
3900 let props = ReaderProperties::builder()
3901 .set_backward_compatible_lz4(false)
3902 .build();
3903 let page_reader = Box::new(
3904 SerializedPageReader::new_with_properties(
3905 Arc::new(file),
3906 &result.metadata,
3907 result.rows_written as usize,
3908 None,
3909 Arc::new(props),
3910 )
3911 .unwrap(),
3912 );
3913 let mut reader = get_test_column_reader::<T>(page_reader, max_def_level, max_rep_level);
3914
3915 let mut actual_values = Vec::with_capacity(max_batch_size);
3916 let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size));
3917 let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size));
3918
3919 let (_, values_read, levels_read) = reader
3920 .read_records(
3921 max_batch_size,
3922 actual_def_levels.as_mut(),
3923 actual_rep_levels.as_mut(),
3924 &mut actual_values,
3925 )
3926 .unwrap();
3927
3928 assert_eq!(&actual_values[..values_read], values);
3931 match actual_def_levels {
3932 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), def_levels),
3933 None => assert_eq!(None, def_levels),
3934 }
3935 match actual_rep_levels {
3936 Some(ref vec) => assert_eq!(Some(&vec[..levels_read]), rep_levels),
3937 None => assert_eq!(None, rep_levels),
3938 }
3939
3940 if let Some(levels) = actual_rep_levels {
3943 let mut actual_rows_written = 0;
3944 for l in levels {
3945 if l == 0 {
3946 actual_rows_written += 1;
3947 }
3948 }
3949 assert_eq!(actual_rows_written, result.rows_written);
3950 } else if actual_def_levels.is_some() {
3951 assert_eq!(levels_read as u64, result.rows_written);
3952 } else {
3953 assert_eq!(values_read as u64, result.rows_written);
3954 }
3955 }
3956
3957 fn column_write_and_get_metadata<T: DataType>(
3960 props: WriterProperties,
3961 values: &[T::T],
3962 ) -> ColumnChunkMetaData {
3963 let page_writer = get_test_page_writer();
3964 let props = Arc::new(props);
3965 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
3966 writer.write_batch(values, None, None).unwrap();
3967 writer.close().unwrap().metadata
3968 }
3969
3970 fn encoding_stats(page_type: PageType, encoding: Encoding, count: i32) -> PageEncodingStats {
3972 PageEncodingStats {
3973 page_type,
3974 encoding,
3975 count,
3976 }
3977 }
3978
3979 fn check_encoding_write_support<T: DataType>(
3983 version: WriterVersion,
3984 dict_enabled: bool,
3985 data: &[T::T],
3986 dictionary_page_offset: Option<i64>,
3987 encodings: &[Encoding],
3988 page_encoding_stats: &[PageEncodingStats],
3989 ) {
3990 let props = WriterProperties::builder()
3991 .set_writer_version(version)
3992 .set_dictionary_enabled(dict_enabled)
3993 .build();
3994 let meta = column_write_and_get_metadata::<T>(props, data);
3995 assert_eq!(meta.dictionary_page_offset(), dictionary_page_offset);
3996 assert_eq!(meta.encodings(), encodings);
3997 assert_eq!(meta.page_encoding_stats().unwrap(), page_encoding_stats);
3998 }
3999
4000 fn get_test_column_writer<'a, T: DataType>(
4002 page_writer: Box<dyn PageWriter + 'a>,
4003 max_def_level: i16,
4004 max_rep_level: i16,
4005 props: WriterPropertiesPtr,
4006 ) -> ColumnWriterImpl<'a, T> {
4007 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4008 let column_writer = get_column_writer(descr, props, page_writer);
4009 get_typed_column_writer::<T>(column_writer)
4010 }
4011
4012 fn get_test_column_reader<T: DataType>(
4014 page_reader: Box<dyn PageReader>,
4015 max_def_level: i16,
4016 max_rep_level: i16,
4017 ) -> ColumnReaderImpl<T> {
4018 let descr = Arc::new(get_test_column_descr::<T>(max_def_level, max_rep_level));
4019 let column_reader = get_column_reader(descr, page_reader);
4020 get_typed_column_reader::<T>(column_reader)
4021 }
4022
4023 fn get_test_column_descr<T: DataType>(
4025 max_def_level: i16,
4026 max_rep_level: i16,
4027 ) -> ColumnDescriptor {
4028 let path = ColumnPath::from("col");
4029 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4030 .with_length(1)
4033 .build()
4034 .unwrap();
4035 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4036 }
4037
4038 fn get_test_page_writer() -> Box<dyn PageWriter> {
4040 Box::new(TestPageWriter {})
4041 }
4042
4043 struct TestPageWriter {}
4044
4045 impl PageWriter for TestPageWriter {
4046 fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
4047 let mut res = PageWriteSpec::new();
4048 res.page_type = page.page_type();
4049 res.uncompressed_size = page.uncompressed_size();
4050 res.compressed_size = page.compressed_size();
4051 res.num_values = page.num_values();
4052 res.offset = 0;
4053 res.bytes_written = page.data().len() as u64;
4054 Ok(res)
4055 }
4056
4057 fn close(&mut self) -> Result<()> {
4058 Ok(())
4059 }
4060 }
4061
4062 fn statistics_roundtrip<T: DataType>(values: &[<T as DataType>::T]) -> Statistics {
4064 let page_writer = get_test_page_writer();
4065 let props = Default::default();
4066 let mut writer = get_test_column_writer::<T>(page_writer, 0, 0, props);
4067 writer.write_batch(values, None, None).unwrap();
4068
4069 let metadata = writer.close().unwrap().metadata;
4070 if let Some(stats) = metadata.statistics() {
4071 stats.clone()
4072 } else {
4073 panic!("metadata missing statistics");
4074 }
4075 }
4076
4077 fn get_test_decimals_column_writer<T: DataType>(
4079 page_writer: Box<dyn PageWriter>,
4080 max_def_level: i16,
4081 max_rep_level: i16,
4082 props: WriterPropertiesPtr,
4083 ) -> ColumnWriterImpl<'static, T> {
4084 let descr = Arc::new(get_test_decimals_column_descr::<T>(
4085 max_def_level,
4086 max_rep_level,
4087 ));
4088 let column_writer = get_column_writer(descr, props, page_writer);
4089 get_typed_column_writer::<T>(column_writer)
4090 }
4091
4092 fn get_test_decimals_column_descr<T: DataType>(
4094 max_def_level: i16,
4095 max_rep_level: i16,
4096 ) -> ColumnDescriptor {
4097 let path = ColumnPath::from("col");
4098 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4099 .with_length(16)
4100 .with_logical_type(Some(LogicalType::Decimal {
4101 scale: 2,
4102 precision: 3,
4103 }))
4104 .with_scale(2)
4105 .with_precision(3)
4106 .build()
4107 .unwrap();
4108 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4109 }
4110
4111 fn float16_statistics_roundtrip(
4112 values: &[FixedLenByteArray],
4113 ) -> ValueStatistics<FixedLenByteArray> {
4114 let page_writer = get_test_page_writer();
4115 let mut writer = get_test_float16_column_writer(page_writer, Default::default());
4116 writer.write_batch(values, None, None).unwrap();
4117
4118 let metadata = writer.close().unwrap().metadata;
4119 if let Some(Statistics::FixedLenByteArray(stats)) = metadata.statistics() {
4120 stats.clone()
4121 } else {
4122 panic!("metadata missing statistics");
4123 }
4124 }
4125
4126 fn get_test_float16_column_writer(
4127 page_writer: Box<dyn PageWriter>,
4128 props: WriterPropertiesPtr,
4129 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4130 let descr = Arc::new(get_test_float16_column_descr(0, 0));
4131 let column_writer = get_column_writer(descr, props, page_writer);
4132 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4133 }
4134
4135 fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
4136 let path = ColumnPath::from("col");
4137 let tpe =
4138 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4139 .with_length(2)
4140 .with_logical_type(Some(LogicalType::Float16))
4141 .build()
4142 .unwrap();
4143 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4144 }
4145
4146 fn get_test_interval_column_writer(
4147 page_writer: Box<dyn PageWriter>,
4148 ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
4149 let descr = Arc::new(get_test_interval_column_descr());
4150 let column_writer = get_column_writer(descr, Default::default(), page_writer);
4151 get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
4152 }
4153
4154 fn get_test_interval_column_descr() -> ColumnDescriptor {
4155 let path = ColumnPath::from("col");
4156 let tpe =
4157 SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
4158 .with_length(12)
4159 .with_converted_type(ConvertedType::INTERVAL)
4160 .build()
4161 .unwrap();
4162 ColumnDescriptor::new(Arc::new(tpe), 0, 0, path)
4163 }
4164
4165 fn get_test_unsigned_int_given_as_converted_column_writer<'a, T: DataType>(
4167 page_writer: Box<dyn PageWriter + 'a>,
4168 max_def_level: i16,
4169 max_rep_level: i16,
4170 props: WriterPropertiesPtr,
4171 ) -> ColumnWriterImpl<'a, T> {
4172 let descr = Arc::new(get_test_converted_type_unsigned_integer_column_descr::<T>(
4173 max_def_level,
4174 max_rep_level,
4175 ));
4176 let column_writer = get_column_writer(descr, props, page_writer);
4177 get_typed_column_writer::<T>(column_writer)
4178 }
4179
4180 fn get_test_converted_type_unsigned_integer_column_descr<T: DataType>(
4182 max_def_level: i16,
4183 max_rep_level: i16,
4184 ) -> ColumnDescriptor {
4185 let path = ColumnPath::from("col");
4186 let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
4187 .with_converted_type(ConvertedType::UINT_32)
4188 .build()
4189 .unwrap();
4190 ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
4191 }
4192}